diff options
-rw-r--r-- | src/modules/module-tunnel-sink-new.c | 54 | ||||
-rw-r--r-- | src/modules/module-tunnel-source-new.c | 58 | ||||
-rw-r--r-- | src/modules/module-tunnel.c | 103 | ||||
-rw-r--r-- | src/modules/restart-module.c | 63 | ||||
-rw-r--r-- | src/modules/restart-module.h | 12 |
5 files changed, 231 insertions, 59 deletions
diff --git a/src/modules/module-tunnel-sink-new.c b/src/modules/module-tunnel-sink-new.c index 6b862023c..3cea25a90 100644 --- a/src/modules/module-tunnel-sink-new.c +++ b/src/modules/module-tunnel-sink-new.c @@ -118,6 +118,11 @@ struct userdata { pa_usec_t reconnect_interval_us; }; +struct module_restart_data { + struct userdata *userdata; + pa_restart_data *restart_data; +}; + static const char* const valid_modargs[] = { "sink_name", "sink_properties", @@ -337,9 +342,18 @@ static void stream_overflow_callback(pa_stream *stream, void *userdata) { /* Do a reinit of the module. Note that u will be freed as a result of this * call. */ -static void maybe_restart(struct userdata *u) { +static void maybe_restart(struct module_restart_data *rd) { + struct userdata *u = rd->userdata; + + if (rd->restart_data) { + pa_log_debug("Restart already pending"); + return; + } + if (u->reconnect_interval_us > 0) { - pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us); + /* The handle returned here must be freed when do_init() finishes successfully + * and when the module exits. */ + rd->restart_data = pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us); } else { /* exit the module */ pa_module_unload_request(u->module, true); @@ -620,7 +634,7 @@ static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t off create_sink(u); break; case TUNNEL_MESSAGE_MAYBE_RESTART: - maybe_restart(u); + maybe_restart(u->module->userdata); break; } @@ -629,12 +643,16 @@ static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t off static int do_init(pa_module *m) { struct userdata *u = NULL; + struct module_restart_data *rd; pa_modargs *ma = NULL; const char *remote_server = NULL; char *default_sink_name = NULL; uint32_t reconnect_interval_ms = 0; pa_assert(m); + pa_assert(m->userdata); + + rd = m->userdata; if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { pa_log("Failed to parse module arguments."); @@ -643,7 +661,7 @@ static int do_init(pa_module *m) { u = pa_xnew0(struct userdata, 1); u->module = m; - m->userdata = u; + rd->userdata = u; u->sample_spec = m->core->default_sample_spec; u->channel_map = m->core->default_channel_map; @@ -711,6 +729,16 @@ static int do_init(pa_module *m) { goto fail; } + /* If the module is restarting and do_init() finishes successfully, the + * restart data is no longer needed. If do_init() fails, don't touch the + * restart data, because following restart attempts will continue to use + * the same data. If restart_data is NULL, that means no restart is + * currently pending. */ + if (rd->restart_data) { + pa_restart_free(rd->restart_data); + rd->restart_data = NULL; + } + pa_modargs_free(ma); pa_xfree(default_sink_name); @@ -728,10 +756,13 @@ fail: static void do_done(pa_module *m) { struct userdata *u = NULL; + struct module_restart_data *rd; pa_assert(m); - if (!(u = m->userdata)) + if (!(rd = m->userdata)) + return; + if (!(u = rd->userdata)) return; u->shutting_down = true; @@ -777,7 +808,7 @@ static void do_done(pa_module *m) { pa_xfree(u); - m->userdata = NULL; + rd->userdata = NULL; } int pa__init(pa_module *m) { @@ -785,6 +816,8 @@ int pa__init(pa_module *m) { pa_assert(m); + m->userdata = pa_xnew0(struct module_restart_data, 1); + ret = do_init(m); if (ret < 0) @@ -797,4 +830,13 @@ void pa__done(pa_module *m) { pa_assert(m); do_done(m); + + if (m->userdata) { + struct module_restart_data *rd = m->userdata; + + if (rd->restart_data) + pa_restart_free(rd->restart_data); + + pa_xfree(m->userdata); + } } diff --git a/src/modules/module-tunnel-source-new.c b/src/modules/module-tunnel-source-new.c index 21d569d99..b96137a24 100644 --- a/src/modules/module-tunnel-source-new.c +++ b/src/modules/module-tunnel-source-new.c @@ -116,6 +116,11 @@ struct userdata { pa_usec_t reconnect_interval_us; }; +struct module_restart_data { + struct userdata *userdata; + pa_restart_data *restart_data; +}; + static const char* const valid_modargs[] = { "source_name", "source_properties", @@ -320,12 +325,19 @@ static void stream_state_cb(pa_stream *stream, void *userdata) { } /* Do a reinit of the module. Note that u will be freed as a result of this - * call, while pu will live on to the next iteration. It's up to do_done to - * copy anything that we want to persist across iterations out of u and into pu - */ -static void maybe_restart(struct userdata *u) { + * call. */ +static void maybe_restart(struct module_restart_data *rd) { + struct userdata *u = rd->userdata; + + if (rd->restart_data) { + pa_log_debug("Restart already pending"); + return; + } + if (u->reconnect_interval_us > 0) { - pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us); + /* The handle returned here must be freed when do_init() finishes successfully + * and when the module exits. */ + rd->restart_data = pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us); } else { /* exit the module */ pa_module_unload_request(u->module, true); @@ -594,7 +606,7 @@ static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t off create_source(u); break; case TUNNEL_MESSAGE_MAYBE_RESTART: - maybe_restart(u); + maybe_restart(u->module->userdata); break; } @@ -603,12 +615,16 @@ static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t off static int do_init(pa_module *m) { struct userdata *u = NULL; + struct module_restart_data *rd; pa_modargs *ma = NULL; const char *remote_server = NULL; char *default_source_name = NULL; uint32_t reconnect_interval_ms = 0; pa_assert(m); + pa_assert(m->userdata); + + rd = m->userdata; if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { pa_log("Failed to parse module arguments."); @@ -617,7 +633,7 @@ static int do_init(pa_module *m) { u = pa_xnew0(struct userdata, 1); u->module = m; - m->userdata = u; + rd->userdata = u; u->sample_spec = m->core->default_sample_spec; u->channel_map = m->core->default_channel_map; @@ -682,6 +698,16 @@ static int do_init(pa_module *m) { goto fail; } + /* If the module is restarting and do_init() finishes successfully, the + * restart data is no longer needed. If do_init() fails, don't touch the + * restart data, because following restart attempts will continue to use + * the same data. If restart_data is NULL, that means no restart is + * currently pending. */ + if (rd->restart_data) { + pa_restart_free(rd->restart_data); + rd->restart_data = NULL; + } + pa_modargs_free(ma); pa_xfree(default_source_name); @@ -699,10 +725,13 @@ fail: static void do_done(pa_module *m) { struct userdata *u = NULL; + struct module_restart_data *rd; pa_assert(m); - if (!(u = m->userdata)) + if (!(rd = m->userdata)) + return; + if (!(u = rd->userdata)) return; u->shutting_down = true; @@ -748,7 +777,7 @@ static void do_done(pa_module *m) { pa_xfree(u); - m->userdata = NULL; + rd->userdata = NULL; } int pa__init(pa_module *m) { @@ -756,6 +785,8 @@ int pa__init(pa_module *m) { pa_assert(m); + m->userdata = pa_xnew0(struct module_restart_data, 1); + ret = do_init(m); if (ret < 0) @@ -768,4 +799,13 @@ void pa__done(pa_module *m) { pa_assert(m); do_done(m); + + if (m->userdata) { + struct module_restart_data *rd = m->userdata; + + if (rd->restart_data) + pa_restart_free(rd->restart_data); + + pa_xfree(m->userdata); + } } diff --git a/src/modules/module-tunnel.c b/src/modules/module-tunnel.c index e55f2622e..5ecc521a6 100644 --- a/src/modules/module-tunnel.c +++ b/src/modules/module-tunnel.c @@ -292,6 +292,11 @@ struct userdata { pa_usec_t reconnect_interval_us; }; +struct module_restart_data { + struct userdata *userdata; + pa_restart_data *restart_data; +}; + static void request_latency(struct userdata *u); #ifdef TUNNEL_SINK static void create_sink(struct userdata *u); @@ -302,15 +307,21 @@ static void on_source_created(struct userdata *u); #endif /* Do a reinit of the module. Note that u will be freed as a result of this - * call, while pu will live on to the next iteration. It's up to do_done to - * copy anything that we want to persist across iterations out of u and into pu - */ -static void unload_module(struct userdata *u) { + * call. */ +static void unload_module(struct module_restart_data *rd) { + struct userdata *u = rd->userdata; + + if (rd->restart_data) { + pa_log_debug("Restart already pending"); + return; + } + if (u->reconnect_interval_us > 0) { - pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us); - } else { + /* The handle returned here must be freed when do_init() was successful and when the + * module exits. */ + rd->restart_data = pa_restart_module_reinit(u->module, do_init, do_done, u->reconnect_interval_us); + } else pa_module_unload_request(u->module, true); - } } /* Called from main context */ @@ -328,7 +339,7 @@ static void command_stream_killed(pa_pdispatch *pd, uint32_t command, uint32_t pa_assert(u->pdispatch == pd); pa_log_warn("Stream killed"); - unload_module(u); + unload_module(u->module->userdata); } /* Called from main context */ @@ -360,7 +371,7 @@ static void command_suspended(pa_pdispatch *pd, uint32_t command, uint32_t tag !pa_tagstruct_eof(t)) { pa_log("Invalid packet."); - unload_module(u); + unload_module(u->module->userdata); return; } @@ -393,7 +404,7 @@ static void command_moved(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa pa_tagstruct_get_boolean(t, &suspended) < 0) { pa_log_error("Invalid packet."); - unload_module(u); + unload_module(u->module->userdata); return; } @@ -422,7 +433,7 @@ static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t comman pa_tagstruct_getu32(t, &maxlength) < 0) { pa_log_error("Invalid packet."); - unload_module(u); + unload_module(u->module->userdata); return; } @@ -431,7 +442,7 @@ static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t comman pa_tagstruct_get_usec(t, &usec) < 0) { pa_log_error("Invalid packet."); - unload_module(u); + unload_module(u->module->userdata); return; } } else { @@ -441,7 +452,7 @@ static void command_stream_buffer_attr_changed(pa_pdispatch *pd, uint32_t comman pa_tagstruct_get_usec(t, &usec) < 0) { pa_log_error("Invalid packet."); - unload_module(u); + unload_module(u->module->userdata); return; } } @@ -895,7 +906,7 @@ static void command_request(pa_pdispatch *pd, uint32_t command, uint32_t tag, p return; fail: - unload_module(u); + unload_module(u->module->userdata); } #endif @@ -1010,7 +1021,7 @@ static void stream_get_latency_callback(pa_pdispatch *pd, uint32_t command, uint fail: - unload_module(u); + unload_module(u->module->userdata); } /* Called from main context */ @@ -1145,7 +1156,7 @@ static void server_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa return; fail: - unload_module(u); + unload_module(u->module->userdata); } static int read_ports(struct userdata *u, pa_tagstruct *t) { @@ -1300,7 +1311,7 @@ static void sink_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_t return; fail: - unload_module(u); + unload_module(u->module->userdata); } /* Called from main context */ @@ -1409,7 +1420,7 @@ static void sink_input_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag return; fail: - unload_module(u); + unload_module(u->module->userdata); } #else @@ -1499,7 +1510,7 @@ static void source_info_cb(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa return; fail: - unload_module(u); + unload_module(u->module->userdata); } #endif @@ -1560,7 +1571,7 @@ static void command_subscribe_event(pa_pdispatch *pd, uint32_t command, uint32 if (pa_tagstruct_getu32(t, &e) < 0 || pa_tagstruct_getu32(t, &idx) < 0) { pa_log("Invalid protocol reply"); - unload_module(u); + unload_module(u->module->userdata); return; } @@ -1707,7 +1718,7 @@ parse_error: pa_log("Invalid reply. (Create stream)"); fail: - unload_module(u); + unload_module(u->module->userdata); } @@ -1911,7 +1922,7 @@ static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t return; fail: - unload_module(u); + unload_module(u->module->userdata); } /* Called from main context */ @@ -1922,7 +1933,7 @@ static void pstream_die_callback(pa_pstream *p, void *userdata) { pa_assert(u); pa_log_warn("Stream died."); - unload_module(u); + unload_module(u->module->userdata); } /* Called from main context */ @@ -1935,7 +1946,7 @@ static void pstream_packet_callback(pa_pstream *p, pa_packet *packet, pa_cmsg_an if (pa_pdispatch_run(u->pdispatch, packet, ancil_data, u) < 0) { pa_log("Invalid packet"); - unload_module(u); + unload_module(u->module->userdata); return; } } @@ -1951,7 +1962,7 @@ static void pstream_memblock_callback(pa_pstream *p, uint32_t channel, int64_t o if (channel != u->channel) { pa_log("Received memory block on bad channel."); - unload_module(u); + unload_module(u->module->userdata); return; } @@ -1976,7 +1987,7 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata if (!io) { pa_log("Connection failed: %s", pa_cstrerror(errno)); - unload_module(u); + unload_module(u->module->userdata); return; } @@ -1985,14 +1996,14 @@ static void on_connection(pa_socket_client *sc, pa_iochannel *io, void *userdata #ifdef TUNNEL_SINK create_sink(u); if (!u->sink) { - unload_module(u); + unload_module(u->module->userdata); return; } on_sink_created(u); #else create_source(u); if (!u->source) { - unload_module(u); + unload_module(u->module->userdata); return; } on_source_created(u); @@ -2197,7 +2208,7 @@ static int tunnel_process_msg(pa_msgobject *o, int code, void *data, int64_t off switch (code) { case TUNNEL_MESSAGE_MAYBE_RESTART: - unload_module(u); + unload_module(u->module->userdata); break; } @@ -2270,6 +2281,7 @@ done: static int do_init(pa_module *m) { pa_modargs *ma = NULL; struct userdata *u = NULL; + struct module_restart_data *rd; char *server = NULL; uint32_t latency_msec; bool automatic; @@ -2280,13 +2292,16 @@ static int do_init(pa_module *m) { uint32_t reconnect_interval_ms = 0; pa_assert(m); + pa_assert(m->userdata); + + rd = m->userdata; if (!(ma = pa_modargs_new(m->argument, valid_modargs))) { pa_log("Failed to parse module arguments"); goto fail; } - m->userdata = u = pa_xnew0(struct userdata, 1); + rd->userdata = u = pa_xnew0(struct userdata, 1); u->core = m->core; u->module = m; u->client = NULL; @@ -2492,6 +2507,16 @@ static int do_init(pa_module *m) { xcb_disconnect(xcb); #endif + /* If the module is restarting and do_init() finishes successfully, the + * restart data is no longer needed. If do_init() fails, don't touch the + * restart data, because following restart attempts will continue to use + * the same data. If restart_data is NULL, that means no restart is + * currently pending. */ + if (rd->restart_data) { + pa_restart_free(rd->restart_data); + rd->restart_data = NULL; + } + pa_modargs_free(ma); return 0; @@ -2513,10 +2538,13 @@ fail: static void do_done(pa_module *m) { struct userdata *u = NULL; + struct module_restart_data *rd; pa_assert(m); - if (!(u = m->userdata)) + if (!(rd = m->userdata)) + return; + if (!(u = rd->userdata)) return; u->shutting_down = true; @@ -2595,7 +2623,7 @@ static void do_done(pa_module *m) { pa_xfree(u); - m->userdata = NULL; + rd->userdata = NULL; } int pa__init(pa_module *m) { @@ -2603,6 +2631,8 @@ int pa__init(pa_module *m) { pa_assert(m); + m->userdata = pa_xnew0(struct module_restart_data, 1); + ret = do_init(m); if (ret < 0) @@ -2615,4 +2645,13 @@ void pa__done(pa_module *m) { pa_assert(m); do_done(m); + + if (m->userdata) { + struct module_restart_data *rd = m->userdata; + + if (rd->restart_data) + pa_restart_free(rd->restart_data); + + pa_xfree(m->userdata); + } } diff --git a/src/modules/restart-module.c b/src/modules/restart-module.c index d7e855a5d..ea3746455 100644 --- a/src/modules/restart-module.c +++ b/src/modules/restart-module.c @@ -30,31 +30,49 @@ #include <pulsecore/core.h> #include <pulsecore/thread-mq.h> -struct reinit_data { +struct pa_restart_data { init_cb do_init; done_cb do_done; pa_usec_t restart_usec; pa_module *module; + pa_time_event *time_event; + pa_defer_event *defer_event; }; +static void do_reinit(pa_mainloop_api *mainloop, pa_restart_data *rd); + static void call_init(pa_mainloop_api *mainloop, pa_time_event *e, const struct timeval *tv, void *userdata) { - struct reinit_data *rd = userdata; + pa_restart_data *rd = userdata; int ret; + if (rd->time_event) { + mainloop->time_free(rd->time_event); + rd->time_event = NULL; + } + /* now that restart_usec has elapsed, we call do_init to restart the module */ ret = rd->do_init(rd->module); /* if the init failed, we got here because the caller wanted to restart, so * setup another restart */ if (ret < 0) - pa_restart_module_reinit(rd->module, rd->do_init, rd->do_done, rd->restart_usec); + do_reinit(mainloop, rd); +} - pa_xfree(rd); +static void defer_callback(pa_mainloop_api *mainloop, pa_defer_event *e, void *userdata) { + pa_restart_data *rd = userdata; + + pa_assert(rd->defer_event == e); + + mainloop->defer_enable(rd->defer_event, 0); + mainloop->defer_free(rd->defer_event); + rd->defer_event = NULL; + + do_reinit(mainloop, rd); } -static void do_reinit(pa_mainloop_api *mainloop, void *userdata) { - struct reinit_data *rd = userdata; +static void do_reinit(pa_mainloop_api *mainloop, pa_restart_data *rd) { struct timeval tv; pa_assert_ctl_context(); @@ -66,17 +84,20 @@ static void do_reinit(pa_mainloop_api *mainloop, void *userdata) { /* after restart_usec, call do_init to restart the module */ pa_gettimeofday(&tv); pa_timeval_add(&tv, rd->restart_usec); - mainloop->time_new(mainloop, &tv, call_init, rd); + rd->time_event = mainloop->time_new(mainloop, &tv, call_init, rd); } -void pa_restart_module_reinit(pa_module *m, init_cb do_init, done_cb do_done, pa_usec_t restart_usec) { - struct reinit_data *rd; +pa_restart_data *pa_restart_module_reinit(pa_module *m, init_cb do_init, done_cb do_done, pa_usec_t restart_usec) { + pa_restart_data *rd; pa_assert_ctl_context(); + pa_assert(do_init); + pa_assert(do_done); + pa_assert(restart_usec); pa_log_info("Starting reinit for %s", m->name); - rd = pa_xnew0(struct reinit_data, 1); + rd = pa_xnew0(pa_restart_data, 1); rd->do_init = do_init; rd->do_done = do_done; rd->restart_usec = restart_usec; @@ -84,5 +105,25 @@ void pa_restart_module_reinit(pa_module *m, init_cb do_init, done_cb do_done, pa /* defer actually doing a reinit, so that we can safely exit whatever call * chain we're in before we effectively reinit the module */ - pa_mainloop_api_once(m->core->mainloop, do_reinit, rd); + rd->defer_event = m->core->mainloop->defer_new(m->core->mainloop, defer_callback, rd); + m->core->mainloop->defer_enable(rd->defer_event, 1); + + return rd; +} + +void pa_restart_free(pa_restart_data *rd) { + pa_assert_ctl_context(); + pa_assert(rd); + + if (rd->defer_event) { + rd->module->core->mainloop->defer_enable(rd->defer_event, 0); + rd->module->core->mainloop->defer_free(rd->defer_event); + } + + if (rd->time_event) { + pa_log_info("Cancel reinit for %s", rd->module->name); + rd->module->core->mainloop->time_free(rd->time_event); + } + + pa_xfree(rd); } diff --git a/src/modules/restart-module.h b/src/modules/restart-module.h index b1be5f93e..e1a4d7a67 100644 --- a/src/modules/restart-module.h +++ b/src/modules/restart-module.h @@ -29,10 +29,20 @@ extern "C" { #include <pulsecore/core.h> #include <pulsecore/thread-mq.h> +/* Init and exit callbacks of the module */ typedef int (*init_cb)(pa_module *m); typedef void (*done_cb)(pa_module *m); +/* Restart data structure */ +typedef struct pa_restart_data pa_restart_data; -void pa_restart_module_reinit(pa_module *m, init_cb do_init, done_cb do_done, pa_usec_t restart_usec); +/* Tears down the module using the done callback and schedules a restart after restart_usec. + * Returns a handle to the restart event. When the init callback finishes successfully during + * restart or when the restart should be cancelled, the restart event must be destroyed using + * pa_restart_free(). */ +pa_restart_data *pa_restart_module_reinit(pa_module *m, init_cb do_init, done_cb do_done, pa_usec_t restart_usec); + +/* Free the restart event */ +void pa_restart_free(pa_restart_data *data); #ifdef __cplusplus } |