diff options
author | Tanu Kaskinen <tanuk@iki.fi> | 2018-03-13 19:40:38 +0200 |
---|---|---|
committer | Tanu Kaskinen <tanuk@iki.fi> | 2018-03-16 20:05:38 +0200 |
commit | b2537a8f38ad71e4dee57263310235abdf2b95a4 (patch) | |
tree | ec877552e8aa38047f2db65775b2282c53800426 | |
parent | 73b8a57078b94033edf84de2fc0cfbe344c10dcd (diff) | |
download | pulseaudio-b2537a8f38ad71e4dee57263310235abdf2b95a4.tar.gz |
replace sink/source SET_STATE handlers with callbacks
There are no behaviour changes, the code from almost all the SET_STATE
handlers is moved with minimal changes to the newly introduced
set_state_in_io_thread() callback. The only exception is module-tunnel,
which has to call pa_sink_render() after pa_sink.thread_info.state has
been updated. The set_state_in_io_thread() callback is called before
updating that variable, so moving the SET_STATE handler code to the
callback isn't possible.
The purpose of this change is to make it easier to get state change
handling right in modules. Hooking to the SET_STATE messages in modules
required care in calling pa_sink/source_process_msg() at the right time
(or not calling it at all, as was the case on resume failures), and
there were a few bugs (fixed before this patch). Now the core takes care
of ordering things correctly.
Another motivation for this change is that there was some talk about
adding a suspend_cause variable to pa_sink/source.thread_info. The
variable would be updated in the core SET_STATE handler, but that would
not work with the old design, because in case of resume failures modules
didn't call the core message handler.
25 files changed, 849 insertions, 633 deletions
diff --git a/src/modules/alsa/alsa-sink.c b/src/modules/alsa/alsa-sink.c index dca8f6939..9e335374c 100644 --- a/src/modules/alsa/alsa-sink.c +++ b/src/modules/alsa/alsa-sink.c @@ -1184,46 +1184,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse return 0; } - - case PA_SINK_MESSAGE_SET_STATE: - - switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) { - - case PA_SINK_SUSPENDED: { - pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); - - suspend(u); - - break; - } - - case PA_SINK_IDLE: - case PA_SINK_RUNNING: { - int r; - - if (u->sink->thread_info.state == PA_SINK_INIT) { - if (build_pollfd(u) < 0) - /* FIXME: This will cause an assertion failure in - * pa_sink_put(), because with the current design - * pa_sink_put() is not allowed to fail. */ - return -PA_ERR_IO; - } - - if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { - if ((r = unsuspend(u)) < 0) - return r; - } - - break; - } - - case PA_SINK_UNLINKED: - case PA_SINK_INIT: - case PA_SINK_INVALID_STATE: - ; - } - - break; } return pa_sink_process_msg(o, code, data, offset, chunk); @@ -1248,6 +1208,54 @@ static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t new_stat return 0; } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + switch (new_state) { + + case PA_SINK_SUSPENDED: { + pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); + + suspend(u); + + break; + } + + case PA_SINK_IDLE: + case PA_SINK_RUNNING: { + int r; + + if (u->sink->thread_info.state == PA_SINK_INIT) { + if (build_pollfd(u) < 0) + /* FIXME: This will cause an assertion failure, because + * with the current design pa_sink_put() is not allowed + * to fail and pa_sink_put() has no fallback code that + * would start the sink suspended if opening the device + * fails. */ + return -PA_ERR_IO; + } + + if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { + if ((r = unsuspend(u)) < 0) + return r; + } + + break; + } + + case PA_SINK_UNLINKED: + case PA_SINK_INIT: + case PA_SINK_INVALID_STATE: + break; + } + + return 0; +} + static int ctl_mixer_callback(snd_mixer_elem_t *elem, unsigned int mask) { struct userdata *u = snd_mixer_elem_get_callback_private(elem); @@ -2360,6 +2368,7 @@ pa_sink *pa_alsa_sink_new(pa_module *m, pa_modargs *ma, const char*driver, pa_ca if (u->use_tsched) u->sink->update_requested_latency = sink_update_requested_latency_cb; u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; if (u->ucm_context) u->sink->set_port = sink_set_port_ucm_cb; else diff --git a/src/modules/alsa/alsa-source.c b/src/modules/alsa/alsa-source.c index b3adc7e79..312b2596a 100644 --- a/src/modules/alsa/alsa-source.c +++ b/src/modules/alsa/alsa-source.c @@ -1039,46 +1039,6 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off return 0; } - - case PA_SOURCE_MESSAGE_SET_STATE: - - switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) { - - case PA_SOURCE_SUSPENDED: { - pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state)); - - suspend(u); - - break; - } - - case PA_SOURCE_IDLE: - case PA_SOURCE_RUNNING: { - int r; - - if (u->source->thread_info.state == PA_SOURCE_INIT) { - if (build_pollfd(u) < 0) - /* FIXME: This will cause an assertion failure in - * pa_source_put(), because with the current design - * pa_source_put() is not allowed to fail. */ - return -PA_ERR_IO; - } - - if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) { - if ((r = unsuspend(u)) < 0) - return r; - } - - break; - } - - case PA_SOURCE_UNLINKED: - case PA_SOURCE_INIT: - case PA_SOURCE_INVALID_STATE: - ; - } - - break; } return pa_source_process_msg(o, code, data, offset, chunk); @@ -1103,6 +1063,54 @@ static int source_set_state_in_main_thread_cb(pa_source *s, pa_source_state_t ne return 0; } +/* Called from the IO thread. */ +static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + switch (new_state) { + + case PA_SOURCE_SUSPENDED: { + pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state)); + + suspend(u); + + break; + } + + case PA_SOURCE_IDLE: + case PA_SOURCE_RUNNING: { + int r; + + if (u->source->thread_info.state == PA_SOURCE_INIT) { + if (build_pollfd(u) < 0) + /* FIXME: This will cause an assertion failure, because + * with the current design pa_source_put() is not allowed + * to fail and pa_source_put() has no fallback code that + * would start the source suspended if opening the device + * fails. */ + return -PA_ERR_IO; + } + + if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) { + if ((r = unsuspend(u)) < 0) + return r; + } + + break; + } + + case PA_SOURCE_UNLINKED: + case PA_SOURCE_INIT: + case PA_SOURCE_INVALID_STATE: + ; + } + + return 0; +} + static int ctl_mixer_callback(snd_mixer_elem_t *elem, unsigned int mask) { struct userdata *u = snd_mixer_elem_get_callback_private(elem); @@ -2036,6 +2044,7 @@ pa_source *pa_alsa_source_new(pa_module *m, pa_modargs *ma, const char*driver, p if (u->use_tsched) u->source->update_requested_latency = source_update_requested_latency_cb; u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb; + u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb; if (u->ucm_context) u->source->set_port = source_set_port_ucm_cb; else diff --git a/src/modules/bluetooth/module-bluez4-device.c b/src/modules/bluetooth/module-bluez4-device.c index c6baee84c..85eb7986b 100644 --- a/src/modules/bluetooth/module-bluez4-device.c +++ b/src/modules/bluetooth/module-bluez4-device.c @@ -386,45 +386,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse switch (code) { - case PA_SINK_MESSAGE_SET_STATE: - - switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) { - - case PA_SINK_SUSPENDED: - /* Ignore if transition is PA_SINK_INIT->PA_SINK_SUSPENDED */ - if (!PA_SINK_IS_OPENED(u->sink->thread_info.state)) - break; - - /* Stop the device if the source is suspended as well */ - if (!u->source || u->source->state == PA_SOURCE_SUSPENDED) - /* We deliberately ignore whether stopping - * actually worked. Since the stream_fd is - * closed it doesn't really matter */ - bt_transport_release(u); - - break; - - case PA_SINK_IDLE: - case PA_SINK_RUNNING: - if (u->sink->thread_info.state != PA_SINK_SUSPENDED) - break; - - /* Resume the device if the source was suspended as well */ - if (!u->source || !PA_SOURCE_IS_OPENED(u->source->thread_info.state)) { - if (bt_transport_acquire(u, false) < 0) - return -1; - else - setup_stream(u); - } - break; - - case PA_SINK_UNLINKED: - case PA_SINK_INIT: - case PA_SINK_INVALID_STATE: - ; - } - break; - case PA_SINK_MESSAGE_GET_LATENCY: { if (u->read_smoother) { @@ -451,55 +412,61 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse return pa_sink_process_msg(o, code, data, offset, chunk); } -/* Run from IO thread */ -static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { - struct userdata *u = PA_SOURCE(o)->userdata; - - pa_assert(u->source == PA_SOURCE(o)); - pa_assert(u->transport); - - switch (code) { +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; - case PA_SOURCE_MESSAGE_SET_STATE: + pa_assert(s); + pa_assert_se(u = s->userdata); - switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) { + switch (new_state) { - case PA_SOURCE_SUSPENDED: - /* Ignore if transition is PA_SOURCE_INIT->PA_SOURCE_SUSPENDED */ - if (!PA_SOURCE_IS_OPENED(u->source->thread_info.state)) - break; + case PA_SINK_SUSPENDED: + /* Ignore if transition is PA_SINK_INIT->PA_SINK_SUSPENDED */ + if (!PA_SINK_IS_OPENED(u->sink->thread_info.state)) + break; - /* Stop the device if the sink is suspended as well */ - if (!u->sink || u->sink->state == PA_SINK_SUSPENDED) - bt_transport_release(u); + /* Stop the device if the source is suspended as well */ + if (!u->source || u->source->state == PA_SOURCE_SUSPENDED) + /* We deliberately ignore whether stopping + * actually worked. Since the stream_fd is + * closed it doesn't really matter */ + bt_transport_release(u); - if (u->read_smoother) - pa_smoother_pause(u->read_smoother, pa_rtclock_now()); - break; + break; - case PA_SOURCE_IDLE: - case PA_SOURCE_RUNNING: - if (u->source->thread_info.state != PA_SOURCE_SUSPENDED) - break; - - /* Resume the device if the sink was suspended as well */ - if (!u->sink || !PA_SINK_IS_OPENED(u->sink->thread_info.state)) { - if (bt_transport_acquire(u, false) < 0) - return -1; - else - setup_stream(u); - } - /* We don't resume the smoother here. Instead we - * wait until the first packet arrives */ - break; + case PA_SINK_IDLE: + case PA_SINK_RUNNING: + if (u->sink->thread_info.state != PA_SINK_SUSPENDED) + break; - case PA_SOURCE_UNLINKED: - case PA_SOURCE_INIT: - case PA_SOURCE_INVALID_STATE: - ; + /* Resume the device if the source was suspended as well */ + if (!u->source || !PA_SOURCE_IS_OPENED(u->source->thread_info.state)) { + if (bt_transport_acquire(u, false) < 0) + return -1; + else + setup_stream(u); } break; + case PA_SINK_UNLINKED: + case PA_SINK_INIT: + case PA_SINK_INVALID_STATE: + break; + } + + return 0; +} + +/* Run from IO thread */ +static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { + struct userdata *u = PA_SOURCE(o)->userdata; + + pa_assert(u->source == PA_SOURCE(o)); + pa_assert(u->transport); + + switch (code) { + case PA_SOURCE_MESSAGE_GET_LATENCY: { int64_t wi, ri; @@ -519,6 +486,53 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off return pa_source_process_msg(o, code, data, offset, chunk); } +/* Called from the IO thread. */ +static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + switch (new_state) { + + case PA_SOURCE_SUSPENDED: + /* Ignore if transition is PA_SOURCE_INIT->PA_SOURCE_SUSPENDED */ + if (!PA_SOURCE_IS_OPENED(u->source->thread_info.state)) + break; + + /* Stop the device if the sink is suspended as well */ + if (!u->sink || u->sink->state == PA_SINK_SUSPENDED) + bt_transport_release(u); + + if (u->read_smoother) + pa_smoother_pause(u->read_smoother, pa_rtclock_now()); + break; + + case PA_SOURCE_IDLE: + case PA_SOURCE_RUNNING: + if (u->source->thread_info.state != PA_SOURCE_SUSPENDED) + break; + + /* Resume the device if the sink was suspended as well */ + if (!u->sink || !PA_SINK_IS_OPENED(u->sink->thread_info.state)) { + if (bt_transport_acquire(u, false) < 0) + return -1; + else + setup_stream(u); + } + /* We don't resume the smoother here. Instead we + * wait until the first packet arrives */ + break; + + case PA_SOURCE_UNLINKED: + case PA_SOURCE_INIT: + case PA_SOURCE_INVALID_STATE: + break; + } + + return 0; +} + /* Called from main thread context */ static int device_process_msg(pa_msgobject *obj, int code, void *data, int64_t offset, pa_memchunk *chunk) { struct bluetooth_msg *u = BLUETOOTH_MSG(obj); @@ -1591,6 +1605,7 @@ static int add_sink(struct userdata *u) { u->sink->userdata = u; u->sink->parent.process_msg = sink_process_msg; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->set_port = sink_set_port_cb; } @@ -1663,6 +1678,7 @@ static int add_source(struct userdata *u) { u->source->userdata = u; u->source->parent.process_msg = source_process_msg; + u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb; u->source->set_port = source_set_port_cb; } diff --git a/src/modules/bluetooth/module-bluez5-device.c b/src/modules/bluetooth/module-bluez5-device.c index b83f0eafa..5e189ba24 100644 --- a/src/modules/bluetooth/module-bluez5-device.c +++ b/src/modules/bluetooth/module-bluez5-device.c @@ -891,48 +891,6 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off switch (code) { - case PA_SOURCE_MESSAGE_SET_STATE: - - switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) { - - case PA_SOURCE_SUSPENDED: - /* Ignore if transition is PA_SOURCE_INIT->PA_SOURCE_SUSPENDED */ - if (!PA_SOURCE_IS_OPENED(u->source->thread_info.state)) - break; - - /* Stop the device if the sink is suspended as well */ - if (!u->sink || u->sink->state == PA_SINK_SUSPENDED) - transport_release(u); - - if (u->read_smoother) - pa_smoother_pause(u->read_smoother, pa_rtclock_now()); - - break; - - case PA_SOURCE_IDLE: - case PA_SOURCE_RUNNING: - if (u->source->thread_info.state != PA_SOURCE_SUSPENDED) - break; - - /* Resume the device if the sink was suspended as well */ - if (!u->sink || !PA_SINK_IS_OPENED(u->sink->thread_info.state)) { - if (!setup_transport_and_stream(u)) - return -1; - } - - /* We don't resume the smoother here. Instead we - * wait until the first packet arrives */ - - break; - - case PA_SOURCE_UNLINKED: - case PA_SOURCE_INIT: - case PA_SOURCE_INVALID_STATE: - break; - } - - break; - case PA_SOURCE_MESSAGE_GET_LATENCY: { int64_t wi, ri; @@ -956,6 +914,53 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off return pa_source_process_msg(o, code, data, offset, chunk); } +/* Called from the IO thread. */ +static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + switch (new_state) { + + case PA_SOURCE_SUSPENDED: + /* Ignore if transition is PA_SOURCE_INIT->PA_SOURCE_SUSPENDED */ + if (!PA_SOURCE_IS_OPENED(u->source->thread_info.state)) + break; + + /* Stop the device if the sink is suspended as well */ + if (!u->sink || u->sink->state == PA_SINK_SUSPENDED) + transport_release(u); + + if (u->read_smoother) + pa_smoother_pause(u->read_smoother, pa_rtclock_now()); + + break; + + case PA_SOURCE_IDLE: + case PA_SOURCE_RUNNING: + if (u->source->thread_info.state != PA_SOURCE_SUSPENDED) + break; + + /* Resume the device if the sink was suspended as well */ + if (!u->sink || !PA_SINK_IS_OPENED(u->sink->thread_info.state)) + if (!setup_transport_and_stream(u)) + return -1; + + /* We don't resume the smoother here. Instead we + * wait until the first packet arrives */ + + break; + + case PA_SOURCE_UNLINKED: + case PA_SOURCE_INIT: + case PA_SOURCE_INVALID_STATE: + break; + } + + return 0; +} + /* Run from main thread */ static void source_set_volume_cb(pa_source *s) { uint16_t gain; @@ -1044,6 +1049,7 @@ static int add_source(struct userdata *u) { u->source->userdata = u; u->source->parent.process_msg = source_process_msg; + u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb; if (u->profile == PA_BLUETOOTH_PROFILE_HEADSET_HEAD_UNIT || u->profile == PA_BLUETOOTH_PROFILE_HEADSET_AUDIO_GATEWAY) { pa_source_set_set_volume_callback(u->source, source_set_volume_cb); @@ -1061,45 +1067,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse switch (code) { - case PA_SINK_MESSAGE_SET_STATE: - - switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) { - - case PA_SINK_SUSPENDED: - /* Ignore if transition is PA_SINK_INIT->PA_SINK_SUSPENDED */ - if (!PA_SINK_IS_OPENED(u->sink->thread_info.state)) - break; - - /* Stop the device if the source is suspended as well */ - if (!u->source || u->source->state == PA_SOURCE_SUSPENDED) - /* We deliberately ignore whether stopping - * actually worked. Since the stream_fd is - * closed it doesn't really matter */ - transport_release(u); - - break; - - case PA_SINK_IDLE: - case PA_SINK_RUNNING: - if (u->sink->thread_info.state != PA_SINK_SUSPENDED) - break; - - /* Resume the device if the source was suspended as well */ - if (!u->source || !PA_SOURCE_IS_OPENED(u->source->thread_info.state)) { - if (!setup_transport_and_stream(u)) - return -1; - } - - break; - - case PA_SINK_UNLINKED: - case PA_SINK_INIT: - case PA_SINK_INVALID_STATE: - break; - } - - break; - case PA_SINK_MESSAGE_GET_LATENCY: { int64_t wi, ri; @@ -1124,6 +1091,50 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse return pa_sink_process_msg(o, code, data, offset, chunk); } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + switch (new_state) { + + case PA_SINK_SUSPENDED: + /* Ignore if transition is PA_SINK_INIT->PA_SINK_SUSPENDED */ + if (!PA_SINK_IS_OPENED(u->sink->thread_info.state)) + break; + + /* Stop the device if the source is suspended as well */ + if (!u->source || u->source->state == PA_SOURCE_SUSPENDED) + /* We deliberately ignore whether stopping + * actually worked. Since the stream_fd is + * closed it doesn't really matter */ + transport_release(u); + + break; + + case PA_SINK_IDLE: + case PA_SINK_RUNNING: + if (u->sink->thread_info.state != PA_SINK_SUSPENDED) + break; + + /* Resume the device if the source was suspended as well */ + if (!u->source || !PA_SOURCE_IS_OPENED(u->source->thread_info.state)) + if (!setup_transport_and_stream(u)) + return -1; + + break; + + case PA_SINK_UNLINKED: + case PA_SINK_INIT: + case PA_SINK_INVALID_STATE: + break; + } + + return 0; +} + /* Run from main thread */ static void sink_set_volume_cb(pa_sink *s) { uint16_t gain; @@ -1213,6 +1224,7 @@ static int add_sink(struct userdata *u) { u->sink->userdata = u; u->sink->parent.process_msg = sink_process_msg; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; if (u->profile == PA_BLUETOOTH_PROFILE_HEADSET_HEAD_UNIT || u->profile == PA_BLUETOOTH_PROFILE_HEADSET_AUDIO_GATEWAY) { pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb); diff --git a/src/modules/echo-cancel/module-echo-cancel.c b/src/modules/echo-cancel/module-echo-cancel.c index 7af2f4b22..893c41eeb 100644 --- a/src/modules/echo-cancel/module-echo-cancel.c +++ b/src/modules/echo-cancel/module-echo-cancel.c @@ -458,19 +458,6 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec); return 0; - - case PA_SINK_MESSAGE_SET_STATE: { - pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data); - - /* When set to running or idle for the first time, request a rewind - * of the master sink to make sure we are heard immediately */ - if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { - pa_log_debug("Requesting rewind due to state change."); - pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); - } - break; - } - } return pa_sink_process_msg(o, code, data, offset, chunk); @@ -526,6 +513,23 @@ static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, p return 0; } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + /* When set to running or idle for the first time, request a rewind + * of the master sink to make sure we are heard immediately */ + if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { + pa_log_debug("Requesting rewind due to state change."); + pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); + } + + return 0; +} + /* Called from source I/O thread context */ static void source_update_requested_latency_cb(pa_source *s) { struct userdata *u; @@ -1926,6 +1930,7 @@ int pa__init(pa_module*m) { u->sink->parent.process_msg = sink_process_msg_cb; u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->update_requested_latency = sink_update_requested_latency_cb; u->sink->request_rewind = sink_request_rewind_cb; pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb); diff --git a/src/modules/module-combine-sink.c b/src/modules/module-combine-sink.c index 22800a8bb..bbd416b6f 100644 --- a/src/modules/module-combine-sink.c +++ b/src/modules/module-combine-sink.c @@ -718,6 +718,25 @@ static int sink_set_state_in_main_thread_cb(pa_sink *sink, pa_sink_state_t state return 0; } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + bool running; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + running = new_state == PA_SINK_RUNNING; + pa_atomic_store(&u->thread_info.running, running); + + if (running) + pa_smoother_resume(u->thread_info.smoother, pa_rtclock_now(), true); + else + pa_smoother_pause(u->thread_info.smoother, pa_rtclock_now()); + + return 0; +} + /* Called from IO context */ static void update_max_request(struct userdata *u) { size_t max_request = 0; @@ -859,19 +878,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse switch (code) { - case PA_SINK_MESSAGE_SET_STATE: { - bool running = (PA_PTR_TO_UINT(data) == PA_SINK_RUNNING); - - pa_atomic_store(&u->thread_info.running, running); - - if (running) - pa_smoother_resume(u->thread_info.smoother, pa_rtclock_now(), true); - else - pa_smoother_pause(u->thread_info.smoother, pa_rtclock_now()); - - break; - } - case PA_SINK_MESSAGE_GET_LATENCY: { pa_usec_t x, y, c; int64_t *delay = data; @@ -1426,6 +1432,7 @@ int pa__init(pa_module*m) { u->sink->parent.process_msg = sink_process_msg; u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->update_requested_latency = sink_update_requested_latency; u->sink->userdata = u; diff --git a/src/modules/module-equalizer-sink.c b/src/modules/module-equalizer-sink.c index efe95b3fb..36029b389 100644 --- a/src/modules/module-equalizer-sink.c +++ b/src/modules/module-equalizer-sink.c @@ -267,18 +267,6 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of //+ pa_bytes_to_usec(u->latency * fs, ss) return 0; } - - case PA_SINK_MESSAGE_SET_STATE: { - pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data); - - /* When set to running or idle for the first time, request a rewind - * of the master sink to make sure we are heard immediately */ - if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { - pa_log_debug("Requesting rewind due to state change."); - pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); - } - break; - } } return pa_sink_process_msg(o, code, data, offset, chunk); @@ -299,6 +287,23 @@ static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, p return 0; } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + /* When set to running or idle for the first time, request a rewind + * of the master sink to make sure we are heard immediately */ + if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { + pa_log_debug("Requesting rewind due to state change."); + pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); + } + + return 0; +} + /* Called from I/O thread context */ static void sink_request_rewind_cb(pa_sink *s) { struct userdata *u; @@ -1230,6 +1235,7 @@ int pa__init(pa_module*m) { u->sink->parent.process_msg = sink_process_msg_cb; u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->update_requested_latency = sink_update_requested_latency_cb; u->sink->request_rewind = sink_request_rewind_cb; pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb); diff --git a/src/modules/module-esound-sink.c b/src/modules/module-esound-sink.c index d93ad9c5b..9fea2da74 100644 --- a/src/modules/module-esound-sink.c +++ b/src/modules/module-esound-sink.c @@ -141,32 +141,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse switch (code) { - case PA_SINK_MESSAGE_SET_STATE: - - switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) { - - case PA_SINK_SUSPENDED: - pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); - - pa_smoother_pause(u->smoother, pa_rtclock_now()); - break; - - case PA_SINK_IDLE: - case PA_SINK_RUNNING: - - if (u->sink->thread_info.state == PA_SINK_SUSPENDED) - pa_smoother_resume(u->smoother, pa_rtclock_now(), true); - - break; - - case PA_SINK_UNLINKED: - case PA_SINK_INIT: - case PA_SINK_INVALID_STATE: - ; - } - - break; - case PA_SINK_MESSAGE_GET_LATENCY: { pa_usec_t w, r; @@ -194,6 +168,38 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse return pa_sink_process_msg(o, code, data, offset, chunk); } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + switch (new_state) { + + case PA_SINK_SUSPENDED: + pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); + + pa_smoother_pause(u->smoother, pa_rtclock_now()); + break; + + case PA_SINK_IDLE: + case PA_SINK_RUNNING: + + if (u->sink->thread_info.state == PA_SINK_SUSPENDED) + pa_smoother_resume(u->smoother, pa_rtclock_now(), true); + + break; + + case PA_SINK_UNLINKED: + case PA_SINK_INIT: + case PA_SINK_INVALID_STATE: + ; + } + + return 0; +} + static void thread_func(void *userdata) { struct userdata *u = userdata; int write_type = 0; @@ -611,6 +617,7 @@ int pa__init(pa_module*m) { } u->sink->parent.process_msg = sink_process_msg; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->userdata = u; pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq); diff --git a/src/modules/module-ladspa-sink.c b/src/modules/module-ladspa-sink.c index a2db68e1c..de866d96a 100644 --- a/src/modules/module-ladspa-sink.c +++ b/src/modules/module-ladspa-sink.c @@ -374,18 +374,6 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of connect_control_ports(u); return 0; - - case PA_SINK_MESSAGE_SET_STATE: { - pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data); - - /* When set to running or idle for the first time, request a rewind - * of the master sink to make sure we are heard immediately */ - if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { - pa_log_debug("Requesting rewind due to state change."); - pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); - } - break; - } } return pa_sink_process_msg(o, code, data, offset, chunk); @@ -406,6 +394,23 @@ static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, p return 0; } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + /* When set to running or idle for the first time, request a rewind + * of the master sink to make sure we are heard immediately */ + if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { + pa_log_debug("Requesting rewind due to state change."); + pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); + } + + return 0; +} + /* Called from I/O thread context */ static void sink_request_rewind_cb(pa_sink *s) { struct userdata *u; @@ -1298,6 +1303,7 @@ int pa__init(pa_module*m) { u->sink->parent.process_msg = sink_process_msg_cb; u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->update_requested_latency = sink_update_requested_latency_cb; u->sink->request_rewind = sink_request_rewind_cb; pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb); diff --git a/src/modules/module-null-sink.c b/src/modules/module-null-sink.c index 3ace082df..16b0b6870 100644 --- a/src/modules/module-null-sink.c +++ b/src/modules/module-null-sink.c @@ -89,15 +89,6 @@ static int sink_process_msg( struct userdata *u = PA_SINK(o)->userdata; switch (code) { - case PA_SINK_MESSAGE_SET_STATE: - - if (u->sink->thread_info.state == PA_SINK_SUSPENDED || u->sink->thread_info.state == PA_SINK_INIT) { - if (PA_SINK_IS_OPENED(PA_PTR_TO_UINT(data))) - u->timestamp = pa_rtclock_now(); - } - - break; - case PA_SINK_MESSAGE_GET_LATENCY: { pa_usec_t now; @@ -111,6 +102,21 @@ static int sink_process_msg( return pa_sink_process_msg(o, code, data, offset, chunk); } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + if (u->sink->thread_info.state == PA_SINK_SUSPENDED || u->sink->thread_info.state == PA_SINK_INIT) { + if (PA_SINK_IS_OPENED(new_state)) + u->timestamp = pa_rtclock_now(); + } + + return 0; +} + static void sink_update_requested_latency_cb(pa_sink *s) { struct userdata *u; size_t nbytes; @@ -297,6 +303,7 @@ int pa__init(pa_module*m) { } u->sink->parent.process_msg = sink_process_msg; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->update_requested_latency = sink_update_requested_latency_cb; u->sink->userdata = u; diff --git a/src/modules/module-null-source.c b/src/modules/module-null-source.c index 41f17bd98..ae67206a3 100644 --- a/src/modules/module-null-source.c +++ b/src/modules/module-null-source.c @@ -89,13 +89,6 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off struct userdata *u = PA_SOURCE(o)->userdata; switch (code) { - case PA_SOURCE_MESSAGE_SET_STATE: - - if (PA_PTR_TO_UINT(data) == PA_SOURCE_RUNNING) - u->timestamp = pa_rtclock_now(); - - break; - case PA_SOURCE_MESSAGE_GET_LATENCY: { pa_usec_t now; @@ -109,6 +102,19 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off return pa_source_process_msg(o, code, data, offset, chunk); } +/* Called from the IO thread. */ +static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + if (new_state == PA_SOURCE_RUNNING) + u->timestamp = pa_rtclock_now(); + + return 0; +} + static void source_update_requested_latency_cb(pa_source *s) { struct userdata *u; @@ -229,6 +235,7 @@ int pa__init(pa_module*m) { u->latency_time = latency_time; u->source->parent.process_msg = source_process_msg; + u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb; u->source->update_requested_latency = source_update_requested_latency_cb; u->source->userdata = u; diff --git a/src/modules/module-pipe-sink.c b/src/modules/module-pipe-sink.c index 995785e1e..b2378059b 100644 --- a/src/modules/module-pipe-sink.c +++ b/src/modules/module-pipe-sink.c @@ -110,24 +110,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse struct userdata *u = PA_SINK(o)->userdata; switch (code) { - case PA_SINK_MESSAGE_SET_STATE: - if (u->sink->thread_info.state == PA_SINK_SUSPENDED || u->sink->thread_info.state == PA_SINK_INIT) { - if (PA_SINK_IS_OPENED(PA_PTR_TO_UINT(data))) - u->timestamp = pa_rtclock_now(); - } else if (u->sink->thread_info.state == PA_SINK_RUNNING || u->sink->thread_info.state == PA_SINK_IDLE) { - if (PA_PTR_TO_UINT(data) == PA_SINK_SUSPENDED) { - /* Clear potential FIFO error flag */ - u->fifo_error = false; - - /* Continuously dropping data (clear counter on entering suspended state. */ - if (u->bytes_dropped != 0) { - pa_log_debug("Pipe-sink continuously dropping data - clear statistics (%zu -> 0 bytes dropped)", u->bytes_dropped); - u->bytes_dropped = 0; - } - } - } - break; - case PA_SINK_MESSAGE_GET_LATENCY: if (u->use_system_clock_for_timing) { pa_usec_t now; @@ -153,6 +135,32 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse return pa_sink_process_msg(o, code, data, offset, chunk); } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + if (u->sink->thread_info.state == PA_SINK_SUSPENDED || u->sink->thread_info.state == PA_SINK_INIT) { + if (PA_SINK_IS_OPENED(new_state)) + u->timestamp = pa_rtclock_now(); + } else if (u->sink->thread_info.state == PA_SINK_RUNNING || u->sink->thread_info.state == PA_SINK_IDLE) { + if (new_state == PA_SINK_SUSPENDED) { + /* Clear potential FIFO error flag */ + u->fifo_error = false; + + /* Continuously dropping data (clear counter on entering suspended state. */ + if (u->bytes_dropped != 0) { + pa_log_debug("Pipe-sink continuously dropping data - clear statistics (%zu -> 0 bytes dropped)", u->bytes_dropped); + u->bytes_dropped = 0; + } + } + } + + return 0; +} + static void sink_update_requested_latency_cb(pa_sink *s) { struct userdata *u; size_t nbytes; @@ -505,6 +513,7 @@ int pa__init(pa_module *m) { } u->sink->parent.process_msg = sink_process_msg; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; if (u->use_system_clock_for_timing) u->sink->update_requested_latency = sink_update_requested_latency_cb; u->sink->userdata = u; diff --git a/src/modules/module-remap-sink.c b/src/modules/module-remap-sink.c index ec6698795..56e7a85fd 100644 --- a/src/modules/module-remap-sink.c +++ b/src/modules/module-remap-sink.c @@ -94,18 +94,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec); return 0; - - case PA_SINK_MESSAGE_SET_STATE: { - pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data); - - /* When set to running or idle for the first time, request a rewind - * of the master sink to make sure we are heard immediately */ - if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { - pa_log_debug("Requesting rewind due to state change."); - pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); - } - break; - } } return pa_sink_process_msg(o, code, data, offset, chunk); @@ -126,6 +114,23 @@ static int sink_set_state_in_main_thread(pa_sink *s, pa_sink_state_t state, pa_s return 0; } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + /* When set to running or idle for the first time, request a rewind + * of the master sink to make sure we are heard immediately */ + if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { + pa_log_debug("Requesting rewind due to state change."); + pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); + } + + return 0; +} + /* Called from I/O thread context */ static void sink_request_rewind(pa_sink *s) { struct userdata *u; @@ -411,6 +416,7 @@ int pa__init(pa_module*m) { u->sink->parent.process_msg = sink_process_msg; u->sink->set_state_in_main_thread = sink_set_state_in_main_thread; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->update_requested_latency = sink_update_requested_latency; u->sink->request_rewind = sink_request_rewind; u->sink->userdata = u; diff --git a/src/modules/module-sine-source.c b/src/modules/module-sine-source.c index f4c297384..39fb71ab3 100644 --- a/src/modules/module-sine-source.c +++ b/src/modules/module-sine-source.c @@ -87,13 +87,6 @@ static int source_process_msg( switch (code) { - case PA_SOURCE_MESSAGE_SET_STATE: - - if (PA_PTR_TO_UINT(data) == PA_SOURCE_RUNNING) - u->timestamp = pa_rtclock_now(); - - break; - case PA_SOURCE_MESSAGE_GET_LATENCY: { pa_usec_t now, left_to_fill; @@ -109,6 +102,19 @@ static int source_process_msg( return pa_source_process_msg(o, code, data, offset, chunk); } +/* Called from the IO thread. */ +static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + if (new_state == PA_SOURCE_RUNNING) + u->timestamp = pa_rtclock_now(); + + return 0; +} + static void source_update_requested_latency_cb(pa_source *s) { struct userdata *u; @@ -257,6 +263,7 @@ int pa__init(pa_module*m) { } u->source->parent.process_msg = source_process_msg; + u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb; u->source->update_requested_latency = source_update_requested_latency_cb; u->source->userdata = u; diff --git a/src/modules/module-solaris.c b/src/modules/module-solaris.c index a4960b8b7..e68f2a93d 100644 --- a/src/modules/module-solaris.c +++ b/src/modules/module-solaris.c @@ -390,51 +390,57 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse case PA_SINK_MESSAGE_GET_LATENCY: *((int64_t*) data) = sink_get_latency(u, &PA_SINK(o)->sample_spec); return 0; + } - case PA_SINK_MESSAGE_SET_STATE: + return pa_sink_process_msg(o, code, data, offset, chunk); +} - switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) { +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; - case PA_SINK_SUSPENDED: + pa_assert(s); + pa_assert_se(u = s->userdata); - pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); + switch (new_state) { - pa_smoother_pause(u->smoother, pa_rtclock_now()); + case PA_SINK_SUSPENDED: - if (!u->source || u->source_suspended) - suspend(u); + pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); - u->sink_suspended = true; - break; + pa_smoother_pause(u->smoother, pa_rtclock_now()); - case PA_SINK_IDLE: - case PA_SINK_RUNNING: - - if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { - pa_smoother_resume(u->smoother, pa_rtclock_now(), true); - - if (!u->source || u->source_suspended) { - bool mute; - if (unsuspend(u) < 0) - return -1; - u->sink->get_volume(u->sink); - if (u->sink->get_mute(u->sink, &mute) >= 0) - pa_sink_set_mute(u->sink, mute, false); - } - u->sink_suspended = false; - } - break; + if (!u->source || u->source_suspended) + suspend(u); - case PA_SINK_INVALID_STATE: - case PA_SINK_UNLINKED: - case PA_SINK_INIT: - ; - } + u->sink_suspended = true; + break; + + case PA_SINK_IDLE: + case PA_SINK_RUNNING: + + if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { + pa_smoother_resume(u->smoother, pa_rtclock_now(), true); + if (!u->source || u->source_suspended) { + bool mute; + if (unsuspend(u) < 0) + return -1; + u->sink->get_volume(u->sink); + if (u->sink->get_mute(u->sink, &mute) >= 0) + pa_sink_set_mute(u->sink, mute, false); + } + u->sink_suspended = false; + } break; + + case PA_SINK_INVALID_STATE: + case PA_SINK_UNLINKED: + case PA_SINK_INIT: + ; } - return pa_sink_process_msg(o, code, data, offset, chunk); + return 0; } static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { @@ -445,45 +451,51 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off case PA_SOURCE_MESSAGE_GET_LATENCY: *((pa_usec_t*) data) = source_get_latency(u, &PA_SOURCE(o)->sample_spec); return 0; + } - case PA_SOURCE_MESSAGE_SET_STATE: + return pa_source_process_msg(o, code, data, offset, chunk); +} - switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) { +/* Called from the IO thread. */ +static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) { + struct userdata *u; - case PA_SOURCE_SUSPENDED: + pa_assert(s); + pa_assert_se(u = s->userdata); - pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state)); + switch (new_state) { - if (!u->sink || u->sink_suspended) - suspend(u); + case PA_SOURCE_SUSPENDED: - u->source_suspended = true; - break; + pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state)); - case PA_SOURCE_IDLE: - case PA_SOURCE_RUNNING: + if (!u->sink || u->sink_suspended) + suspend(u); - if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) { - if (!u->sink || u->sink_suspended) { - if (unsuspend(u) < 0) - return -1; - u->source->get_volume(u->source); - } - u->source_suspended = false; - } - break; + u->source_suspended = true; + break; - case PA_SOURCE_UNLINKED: - case PA_SOURCE_INIT: - case PA_SOURCE_INVALID_STATE: - ; + case PA_SOURCE_IDLE: + case PA_SOURCE_RUNNING: + if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) { + if (!u->sink || u->sink_suspended) { + if (unsuspend(u) < 0) + return -1; + u->source->get_volume(u->source); + } + u->source_suspended = false; } break; + case PA_SOURCE_UNLINKED: + case PA_SOURCE_INIT: + case PA_SOURCE_INVALID_STATE: + ; + } - return pa_source_process_msg(o, code, data, offset, chunk); + return 0; } static void sink_set_volume(pa_sink *s) { @@ -960,6 +972,7 @@ int pa__init(pa_module *m) { u->source->userdata = u; u->source->parent.process_msg = source_process_msg; + u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb; pa_source_set_asyncmsgq(u->source, u->thread_mq.inq); pa_source_set_rtpoll(u->source, u->rtpoll); @@ -1003,6 +1016,7 @@ int pa__init(pa_module *m) { pa_assert(u->sink); u->sink->userdata = u; u->sink->parent.process_msg = sink_process_msg; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq); pa_sink_set_rtpoll(u->sink, u->rtpoll); diff --git a/src/modules/module-tunnel-sink-new.c b/src/modules/module-tunnel-sink-new.c index 8a67b81f1..313903372 100644 --- a/src/modules/module-tunnel-sink-new.c +++ b/src/modules/module-tunnel-sink-new.c @@ -429,28 +429,37 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of *((int64_t*) data) = remote_latency; return 0; } - case PA_SINK_MESSAGE_SET_STATE: - if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY) - break; + } + return pa_sink_process_msg(o, code, data, offset, chunk); +} - switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) { - case PA_SINK_SUSPENDED: { - cork_stream(u, true); - break; - } - case PA_SINK_IDLE: - case PA_SINK_RUNNING: { - cork_stream(u, false); - break; - } - case PA_SINK_INVALID_STATE: - case PA_SINK_INIT: - case PA_SINK_UNLINKED: - break; - } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY) + return 0; + + switch (new_state) { + case PA_SINK_SUSPENDED: { + cork_stream(u, true); + break; + } + case PA_SINK_IDLE: + case PA_SINK_RUNNING: { + cork_stream(u, false); + break; + } + case PA_SINK_INVALID_STATE: + case PA_SINK_INIT: + case PA_SINK_UNLINKED: break; } - return pa_sink_process_msg(o, code, data, offset, chunk); + + return 0; } int pa__init(pa_module *m) { @@ -545,6 +554,7 @@ int pa__init(pa_module *m) { pa_sink_new_data_done(&sink_data); u->sink->userdata = u; u->sink->parent.process_msg = sink_process_msg_cb; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->update_requested_latency = sink_update_requested_latency_cb; pa_sink_set_latency_range(u->sink, 0, MAX_LATENCY_USEC); diff --git a/src/modules/module-tunnel-source-new.c b/src/modules/module-tunnel-source-new.c index 7ad077111..d0a5414ad 100644 --- a/src/modules/module-tunnel-source-new.c +++ b/src/modules/module-tunnel-source-new.c @@ -428,28 +428,37 @@ static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t return 0; } - case PA_SOURCE_MESSAGE_SET_STATE: - if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY) - break; + } + return pa_source_process_msg(o, code, data, offset, chunk); +} - switch ((pa_source_state_t) PA_PTR_TO_UINT(data)) { - case PA_SOURCE_SUSPENDED: { - cork_stream(u, true); - break; - } - case PA_SOURCE_IDLE: - case PA_SOURCE_RUNNING: { - cork_stream(u, false); - break; - } - case PA_SOURCE_INVALID_STATE: - case PA_SOURCE_INIT: - case PA_SOURCE_UNLINKED: - break; - } +/* Called from the IO thread. */ +static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + if (!u->stream || pa_stream_get_state(u->stream) != PA_STREAM_READY) + return 0; + + switch (new_state) { + case PA_SOURCE_SUSPENDED: { + cork_stream(u, true); + break; + } + case PA_SOURCE_IDLE: + case PA_SOURCE_RUNNING: { + cork_stream(u, false); + break; + } + case PA_SOURCE_INVALID_STATE: + case PA_SOURCE_INIT: + case PA_SOURCE_UNLINKED: break; } - return pa_source_process_msg(o, code, data, offset, chunk); + + return 0; } int pa__init(pa_module *m) { @@ -541,6 +550,7 @@ int pa__init(pa_module *m) { pa_source_new_data_done(&source_data); u->source->userdata = u; u->source->parent.process_msg = source_process_msg_cb; + u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb; u->source->update_requested_latency = source_update_requested_latency_cb; pa_source_set_asyncmsgq(u->source, u->thread_mq->inq); diff --git a/src/modules/module-virtual-sink.c b/src/modules/module-virtual-sink.c index ca6ce5696..68ad20076 100644 --- a/src/modules/module-virtual-sink.c +++ b/src/modules/module-virtual-sink.c @@ -106,18 +106,6 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec); return 0; - - case PA_SINK_MESSAGE_SET_STATE: { - pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data); - - /* When set to running or idle for the first time, request a rewind - * of the master sink to make sure we are heard immediately */ - if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { - pa_log_debug("Requesting rewind due to state change."); - pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); - } - break; - } } return pa_sink_process_msg(o, code, data, offset, chunk); @@ -138,6 +126,23 @@ static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, p return 0; } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + /* When set to running or idle for the first time, request a rewind + * of the master sink to make sure we are heard immediately */ + if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { + pa_log_debug("Requesting rewind due to state change."); + pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); + } + + return 0; +} + /* Called from I/O thread context */ static void sink_request_rewind_cb(pa_sink *s) { struct userdata *u; @@ -556,6 +561,7 @@ int pa__init(pa_module*m) { u->sink->parent.process_msg = sink_process_msg_cb; u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->update_requested_latency = sink_update_requested_latency_cb; u->sink->request_rewind = sink_request_rewind_cb; pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb); diff --git a/src/modules/module-virtual-surround-sink.c b/src/modules/module-virtual-surround-sink.c index 00780d8bd..7c5e246cf 100644 --- a/src/modules/module-virtual-surround-sink.c +++ b/src/modules/module-virtual-surround-sink.c @@ -134,18 +134,6 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of pa_bytes_to_usec(pa_memblockq_get_length(u->sink_input->thread_info.render_memblockq), &u->sink_input->sink->sample_spec); return 0; - - case PA_SINK_MESSAGE_SET_STATE: { - pa_sink_state_t new_state = (pa_sink_state_t) PA_PTR_TO_UINT(data); - - /* When set to running or idle for the first time, request a rewind - * of the master sink to make sure we are heard immediately */ - if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { - pa_log_debug("Requesting rewind due to state change."); - pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); - } - break; - } } return pa_sink_process_msg(o, code, data, offset, chunk); @@ -166,6 +154,23 @@ static int sink_set_state_in_main_thread_cb(pa_sink *s, pa_sink_state_t state, p return 0; } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + /* When set to running or idle for the first time, request a rewind + * of the master sink to make sure we are heard immediately */ + if ((new_state == PA_SINK_IDLE || new_state == PA_SINK_RUNNING) && u->sink->thread_info.state == PA_SINK_INIT) { + pa_log_debug("Requesting rewind due to state change."); + pa_sink_input_request_rewind(u->sink_input, 0, false, true, true); + } + + return 0; +} + /* Called from I/O thread context */ static void sink_request_rewind_cb(pa_sink *s) { struct userdata *u; @@ -730,6 +735,7 @@ int pa__init(pa_module*m) { u->sink->parent.process_msg = sink_process_msg_cb; u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->update_requested_latency = sink_update_requested_latency_cb; u->sink->request_rewind = sink_request_rewind_cb; pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb); diff --git a/src/modules/oss/module-oss.c b/src/modules/oss/module-oss.c index 7d1b9f52b..d2551bcfc 100644 --- a/src/modules/oss/module-oss.c +++ b/src/modules/oss/module-oss.c @@ -643,8 +643,6 @@ fail: /* Called from IO context */ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { struct userdata *u = PA_SINK(o)->userdata; - bool do_trigger = false, quick = true; - pa_sink_state_t new_state; switch (code) { @@ -662,68 +660,73 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse return 0; } + } - case PA_SINK_MESSAGE_SET_STATE: - new_state = PA_PTR_TO_UINT(data); + return pa_sink_process_msg(o, code, data, offset, chunk); +} - switch (new_state) { +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + bool do_trigger = false; + bool quick = true; - case PA_SINK_SUSPENDED: - pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); + pa_assert(s); + pa_assert_se(u = s->userdata); - if (!u->source || u->source_suspended) - suspend(u); + switch (new_state) { - do_trigger = true; + case PA_SINK_SUSPENDED: + pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); - u->sink_suspended = true; - break; + if (!u->source || u->source_suspended) + suspend(u); - case PA_SINK_IDLE: - case PA_SINK_RUNNING: + do_trigger = true; - if (u->sink->thread_info.state == PA_SINK_INIT) { - do_trigger = true; - quick = u->source && PA_SOURCE_IS_OPENED(u->source->thread_info.state); - } + u->sink_suspended = true; + break; - if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { + case PA_SINK_IDLE: + case PA_SINK_RUNNING: - if (!u->source || u->source_suspended) { - if (unsuspend(u) < 0) - return -1; - quick = false; - } + if (u->sink->thread_info.state == PA_SINK_INIT) { + do_trigger = true; + quick = u->source && PA_SOURCE_IS_OPENED(u->source->thread_info.state); + } - do_trigger = true; + if (u->sink->thread_info.state == PA_SINK_SUSPENDED) { - u->out_mmap_current = 0; - u->out_mmap_saved_nfrags = 0; + if (!u->source || u->source_suspended) { + if (unsuspend(u) < 0) + return -1; + quick = false; + } - u->sink_suspended = false; - } + do_trigger = true; - break; + u->out_mmap_current = 0; + u->out_mmap_saved_nfrags = 0; - case PA_SINK_INVALID_STATE: - case PA_SINK_UNLINKED: - case PA_SINK_INIT: - ; + u->sink_suspended = false; } break; + + case PA_SINK_INVALID_STATE: + case PA_SINK_UNLINKED: + case PA_SINK_INIT: + ; } if (do_trigger) trigger(u, new_state, u->source ? u->source->thread_info.state : PA_SOURCE_INVALID_STATE, quick); - return pa_sink_process_msg(o, code, data, offset, chunk); + return 0; } static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) { struct userdata *u = PA_SOURCE(o)->userdata; - bool do_trigger = false, quick = true; - pa_source_state_t new_state; switch (code) { @@ -740,61 +743,68 @@ static int source_process_msg(pa_msgobject *o, int code, void *data, int64_t off *((int64_t*) data) = (int64_t)r; return 0; } + } - case PA_SOURCE_MESSAGE_SET_STATE: - new_state = PA_PTR_TO_UINT(data); + return pa_source_process_msg(o, code, data, offset, chunk); +} - switch (new_state) { +/* Called from the IO thread. */ +static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_state) { + struct userdata *u; + bool do_trigger = false; + bool quick = true; - case PA_SOURCE_SUSPENDED: - pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state)); + pa_assert(s); + pa_assert_se(u = s->userdata); - if (!u->sink || u->sink_suspended) - suspend(u); + switch (new_state) { - do_trigger = true; + case PA_SOURCE_SUSPENDED: + pa_assert(PA_SOURCE_IS_OPENED(u->source->thread_info.state)); - u->source_suspended = true; - break; + if (!u->sink || u->sink_suspended) + suspend(u); - case PA_SOURCE_IDLE: - case PA_SOURCE_RUNNING: + do_trigger = true; - if (u->source->thread_info.state == PA_SOURCE_INIT) { - do_trigger = true; - quick = u->sink && PA_SINK_IS_OPENED(u->sink->thread_info.state); - } + u->source_suspended = true; + break; - if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) { + case PA_SOURCE_IDLE: + case PA_SOURCE_RUNNING: - if (!u->sink || u->sink_suspended) { - if (unsuspend(u) < 0) - return -1; - quick = false; - } + if (u->source->thread_info.state == PA_SOURCE_INIT) { + do_trigger = true; + quick = u->sink && PA_SINK_IS_OPENED(u->sink->thread_info.state); + } - do_trigger = true; + if (u->source->thread_info.state == PA_SOURCE_SUSPENDED) { - u->in_mmap_current = 0; - u->in_mmap_saved_nfrags = 0; + if (!u->sink || u->sink_suspended) { + if (unsuspend(u) < 0) + return -1; + quick = false; + } - u->source_suspended = false; - } - break; + do_trigger = true; - case PA_SOURCE_UNLINKED: - case PA_SOURCE_INIT: - case PA_SOURCE_INVALID_STATE: - ; + u->in_mmap_current = 0; + u->in_mmap_saved_nfrags = 0; + u->source_suspended = false; } break; + + case PA_SOURCE_UNLINKED: + case PA_SOURCE_INIT: + case PA_SOURCE_INVALID_STATE: + ; } if (do_trigger) trigger(u, u->sink ? u->sink->thread_info.state : PA_SINK_INVALID_STATE, new_state, quick); - return pa_source_process_msg(o, code, data, offset, chunk); + return 0; } static void sink_get_volume(pa_sink *s) { @@ -1334,6 +1344,7 @@ int pa__init(pa_module*m) { } u->source->parent.process_msg = source_process_msg; + u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb; u->source->userdata = u; pa_source_set_asyncmsgq(u->source, u->thread_mq.inq); @@ -1403,6 +1414,7 @@ int pa__init(pa_module*m) { } u->sink->parent.process_msg = sink_process_msg; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; u->sink->userdata = u; pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq); diff --git a/src/modules/raop/raop-sink.c b/src/modules/raop/raop-sink.c index 936129cf5..baa346641 100644 --- a/src/modules/raop/raop-sink.c +++ b/src/modules/raop/raop-sink.c @@ -136,64 +136,6 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse pa_assert(u->raop); switch (code) { - case PA_SINK_MESSAGE_SET_STATE: { - switch ((pa_sink_state_t) PA_PTR_TO_UINT(data)) { - case PA_SINK_SUSPENDED: { - pa_log_debug("RAOP: SUSPENDED"); - - pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); - - /* Issue a TEARDOWN if we are still connected */ - if (pa_raop_client_is_alive(u->raop)) { - pa_raop_client_teardown(u->raop); - } - - break; - } - - case PA_SINK_IDLE: { - pa_log_debug("RAOP: IDLE"); - - /* Issue a FLUSH if we're comming from running state */ - if (u->sink->thread_info.state == PA_SINK_RUNNING) { - pa_rtpoll_set_timer_disabled(u->rtpoll); - pa_raop_client_flush(u->raop); - } - - break; - } - - case PA_SINK_RUNNING: { - pa_usec_t now; - - pa_log_debug("RAOP: RUNNING"); - - now = pa_rtclock_now(); - pa_smoother_reset(u->smoother, now, false); - - if (!pa_raop_client_is_alive(u->raop)) { - /* Connecting will trigger a RECORD and start steaming */ - pa_raop_client_announce(u->raop); - } else if (!pa_raop_client_can_stream(u->raop)) { - /* RECORD alredy sent, simply start streaming */ - pa_raop_client_stream(u->raop); - pa_rtpoll_set_timer_absolute(u->rtpoll, now); - u->write_count = 0; - u->start = now; - } - - break; - } - - case PA_SINK_UNLINKED: - case PA_SINK_INIT: - case PA_SINK_INVALID_STATE: - break; - } - - break; - } - case PA_SINK_MESSAGE_GET_LATENCY: { int64_t r = 0; @@ -278,6 +220,68 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse return pa_sink_process_msg(o, code, data, offset, chunk); } +/* Called from the IO thread. */ +static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state) { + struct userdata *u; + + pa_assert(s); + pa_assert_se(u = s->userdata); + + switch (new_state) { + case PA_SINK_SUSPENDED: + pa_log_debug("RAOP: SUSPENDED"); + + pa_assert(PA_SINK_IS_OPENED(u->sink->thread_info.state)); + + /* Issue a TEARDOWN if we are still connected */ + if (pa_raop_client_is_alive(u->raop)) { + pa_raop_client_teardown(u->raop); + } + + break; + + case PA_SINK_IDLE: + pa_log_debug("RAOP: IDLE"); + + /* Issue a FLUSH if we're comming from running state */ + if (u->sink->thread_info.state == PA_SINK_RUNNING) { + pa_rtpoll_set_timer_disabled(u->rtpoll); + pa_raop_client_flush(u->raop); + } + + break; + + case PA_SINK_RUNNING: { + pa_usec_t now; + + pa_log_debug("RAOP: RUNNING"); + + now = pa_rtclock_now(); + pa_smoother_reset(u->smoother, now, false); + + if (!pa_raop_client_is_alive(u->raop)) { + /* Connecting will trigger a RECORD and start steaming */ + pa_raop_client_announce(u->raop); + } else if (!pa_raop_client_can_stream(u->raop)) { + /* RECORD alredy sent, simply start streaming */ + pa_raop_client_stream(u->raop); + pa_rtpoll_set_timer_absolute(u->rtpoll, now); + u->write_count = 0; + u->start = now; + } + + break; + } + + case PA_SINK_UNLINKED: + case PA_SINK_INIT: + case PA_SINK_INVALID_STATE: + break; + } + + return 0; +} + static void sink_set_volume_cb(pa_sink *s) { struct userdata *u = s->userdata; pa_cvolume hw; @@ -696,6 +700,7 @@ pa_sink* pa_raop_sink_new(pa_module *m, pa_modargs *ma, const char *driver) { } u->sink->parent.process_msg = sink_process_msg; + u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb; pa_sink_set_set_volume_callback(u->sink, sink_set_volume_cb); pa_sink_set_set_mute_callback(u->sink, sink_set_mute_cb); u->sink->userdata = u; diff --git a/src/pulsecore/sink.c b/src/pulsecore/sink.c index 6549515b5..2c9334931 100644 --- a/src/pulsecore/sink.c +++ b/src/pulsecore/sink.c @@ -151,6 +151,7 @@ static void reset_callbacks(pa_sink *s) { pa_assert(s); s->set_state_in_main_thread = NULL; + s->set_state_in_io_thread = NULL; s->get_volume = NULL; s->set_volume = NULL; s->write_volume = NULL; @@ -2850,6 +2851,13 @@ int pa_sink_process_msg(pa_msgobject *o, int code, void *userdata, int64_t offse (s->thread_info.state == PA_SINK_SUSPENDED && PA_SINK_IS_OPENED(PA_PTR_TO_UINT(userdata))) || (PA_SINK_IS_OPENED(s->thread_info.state) && PA_PTR_TO_UINT(userdata) == PA_SINK_SUSPENDED); + if (s->set_state_in_io_thread) { + int r; + + if ((r = s->set_state_in_io_thread(s, PA_PTR_TO_UINT(userdata))) < 0) + return r; + } + s->thread_info.state = PA_PTR_TO_UINT(userdata); if (s->thread_info.state == PA_SINK_SUSPENDED) { diff --git a/src/pulsecore/sink.h b/src/pulsecore/sink.h index 0caeb550b..e1ea52495 100644 --- a/src/pulsecore/sink.h +++ b/src/pulsecore/sink.h @@ -124,19 +124,31 @@ struct pa_sink { bool set_mute_in_progress; - /* Called when the main loop requests a state change. Called from - * main loop context. If returns -1 the state change will be - * inhibited. This will also be called even if only the suspend cause + /* Callbacks for doing things when the sink state and/or suspend cause is + * changed. It's fine to set either or both of the callbacks to NULL if the + * implementation doesn't have anything to do on state or suspend cause * changes. * - * s->state and s->suspend_cause haven't been updated yet when this is - * called, so the callback can get the old state through those variables. + * set_state_in_main_thread() is called first. The callback is allowed to + * report failure if and only if the sink changes its state from + * SUSPENDED to IDLE or RUNNING. (FIXME: It would make sense to allow + * failure also when changing state from INIT to IDLE or RUNNING, but + * currently that will crash pa_sink_put().) If + * set_state_in_main_thread() fails, set_state_in_io_thread() won't be + * called. * - * If set_state_in_main_thread() is successful, the IO thread will be - * notified with the SET_STATE message. The message handler is allowed to - * fail, in which case the old state is restored, and - * set_state_in_main_thread() is called again. */ + * If set_state_in_main_thread() is successful (or not set), then + * set_state_in_io_thread() is called. Again, failure is allowed if and + * only if the sink changes state from SUSPENDED to IDLE or RUNNING. If + * set_state_in_io_thread() fails, then set_state_in_main_thread() is + * called again, this time with the state parameter set to SUSPENDED and + * the suspend_cause parameter set to 0. + * + * pa_sink.state, pa_sink.thread_info.state and pa_sink.suspend_cause + * are updated only after all the callback calls. In case of failure, the + * state is set to SUSPENDED and the suspend cause is set to 0. */ int (*set_state_in_main_thread)(pa_sink *s, pa_sink_state_t state, pa_suspend_cause_t suspend_cause); /* may be NULL */ + int (*set_state_in_io_thread)(pa_sink *s, pa_sink_state_t state); /* may be NULL */ /* Sink drivers that support hardware volume may set this * callback. This is called when the current volume needs to be diff --git a/src/pulsecore/source.c b/src/pulsecore/source.c index ad8e5e364..dd56eb082 100644 --- a/src/pulsecore/source.c +++ b/src/pulsecore/source.c @@ -142,6 +142,7 @@ static void reset_callbacks(pa_source *s) { pa_assert(s); s->set_state_in_main_thread = NULL; + s->set_state_in_io_thread = NULL; s->get_volume = NULL; s->set_volume = NULL; s->write_volume = NULL; @@ -2224,6 +2225,13 @@ int pa_source_process_msg(pa_msgobject *object, int code, void *userdata, int64_ (s->thread_info.state == PA_SOURCE_SUSPENDED && PA_SOURCE_IS_OPENED(PA_PTR_TO_UINT(userdata))) || (PA_SOURCE_IS_OPENED(s->thread_info.state) && PA_PTR_TO_UINT(userdata) == PA_SOURCE_SUSPENDED); + if (s->set_state_in_io_thread) { + int r; + + if ((r = s->set_state_in_io_thread(s, PA_PTR_TO_UINT(userdata))) < 0) + return r; + } + s->thread_info.state = PA_PTR_TO_UINT(userdata); if (suspend_change) { diff --git a/src/pulsecore/source.h b/src/pulsecore/source.h index d60e8a1a8..c4fda7965 100644 --- a/src/pulsecore/source.h +++ b/src/pulsecore/source.h @@ -125,19 +125,31 @@ struct pa_source { bool set_mute_in_progress; - /* Called when the main loop requests a state change. Called from - * main loop context. If returns -1 the state change will be - * inhibited. This will also be called even if only the suspend cause + /* Callbacks for doing things when the source state and/or suspend cause is + * changed. It's fine to set either or both of the callbacks to NULL if the + * implementation doesn't have anything to do on state or suspend cause * changes. * - * s->state and s->suspend_cause haven't been updated yet when this is - * called, so the callback can get the old state through those variables. + * set_state_in_main_thread() is called first. The callback is allowed to + * report failure if and only if the source changes its state from + * SUSPENDED to IDLE or RUNNING. (FIXME: It would make sense to allow + * failure also when changing state from INIT to IDLE or RUNNING, but + * currently that will crash pa_source_put().) If + * set_state_in_main_thread() fails, set_state_in_io_thread() won't be + * called. * - * If set_state_in_main_thread() is successful, the IO thread will be - * notified with the SET_STATE message. The message handler is allowed to - * fail, in which case the old state is restored, and - * set_state_in_main_thread() is called again. */ - int (*set_state_in_main_thread)(pa_source *source, pa_source_state_t state, pa_suspend_cause_t suspend_cause); /* may be NULL */ + * If set_state_in_main_thread() is successful (or not set), then + * set_state_in_io_thread() is called. Again, failure is allowed if and + * only if the source changes state from SUSPENDED to IDLE or RUNNING. If + * set_state_in_io_thread() fails, then set_state_in_main_thread() is + * called again, this time with the state parameter set to SUSPENDED and + * the suspend_cause parameter set to 0. + * + * pa_source.state, pa_source.thread_info.state and pa_source.suspend_cause + * are updated only after all the callback calls. In case of failure, the + * state is set to SUSPENDED and the suspend cause is set to 0. */ + int (*set_state_in_main_thread)(pa_source *s, pa_source_state_t state, pa_suspend_cause_t suspend_cause); /* may be NULL */ + int (*set_state_in_io_thread)(pa_source *s, pa_source_state_t state); /* may be NULL */ /* Called when the volume is queried. Called from main loop * context. If this is NULL a PA_SOURCE_MESSAGE_GET_VOLUME message |