diff options
author | Olivier Crête <olivier.crete@collabora.com> | 2014-02-24 18:50:59 -0500 |
---|---|---|
committer | Olivier Crête <olivier.crete@collabora.com> | 2014-02-24 18:56:42 -0500 |
commit | ca0f5c6d8d1b69c1c334b0826a56796905e6c9d8 (patch) | |
tree | ddd2939ccd57d232283326028e895766674b7fdc | |
parent | 0fc36cb22805d8a08e7b5b27fa6174ae694239e6 (diff) | |
download | libnice-ca0f5c6d8d1b69c1c334b0826a56796905e6c9d8.tar.gz |
agent: Delay signal emission after the lock has been released
This way, there can be no annoying re-entrancy in our code.
-rw-r--r-- | agent/agent-priv.h | 3 | ||||
-rw-r--r-- | agent/agent.c | 202 | ||||
-rw-r--r-- | agent/component.c | 4 | ||||
-rw-r--r-- | agent/conncheck.c | 19 | ||||
-rw-r--r-- | agent/discovery.c | 4 |
5 files changed, 161 insertions, 71 deletions
diff --git a/agent/agent-priv.h b/agent/agent-priv.h index 1745c12..37e908c 100644 --- a/agent/agent-priv.h +++ b/agent/agent-priv.h @@ -158,6 +158,8 @@ struct _NiceAgent #endif gchar *software_attribute; /* SOFTWARE attribute */ gboolean reliable; /* property: reliable */ + + GQueue pending_signals; /* XXX: add pointer to internal data struct for ABI-safe extensions */ }; @@ -176,6 +178,7 @@ void agent_signal_gathering_done (NiceAgent *agent); void agent_lock (void); void agent_unlock (void); +void agent_unlock_and_emit (NiceAgent *agent); void agent_signal_new_selected_pair ( NiceAgent *agent, diff --git a/agent/agent.c b/agent/agent.c index 2a0e429..021f0e7 100644 --- a/agent/agent.c +++ b/agent/agent.c @@ -46,6 +46,7 @@ #endif #include <glib.h> +#include <gobject/gvaluecollector.h> #include <string.h> #include <errno.h> @@ -157,6 +158,83 @@ void agent_unlock(void) #endif +typedef struct { + guint signal_id; + GSignalQuery query; + GValue *params; +} QueuedSignal; + + +static void +free_queued_signal (QueuedSignal *sig) +{ + guint i; + + for (i = 0; i < sig->query.n_params; i++) { + if (G_VALUE_HOLDS_POINTER (&sig->params[i])) + g_free (g_value_get_pointer (&sig->params[i])); + g_value_unset (&sig->params[i]); + } + + g_slice_free1 (sizeof(GValue) * (sig->query.n_params + 1), sig->params); + g_slice_free (QueuedSignal, sig); +} + +void +agent_unlock_and_emit (NiceAgent *agent) +{ + GQueue queue = G_QUEUE_INIT; + QueuedSignal *sig; + + queue = agent->pending_signals; + g_queue_init (&agent->pending_signals); + + agent_unlock (); + + while ((sig = g_queue_pop_head (&queue))) { + g_signal_emitv (sig->params, sig->signal_id, 0, NULL); + + free_queued_signal (sig); + } +} + +static void +agent_queue_signal (NiceAgent *agent, guint signal_id, ...) +{ + QueuedSignal *sig; + guint i; + gchar *error = NULL; + va_list var_args; + + sig = g_slice_new (QueuedSignal); + g_signal_query (signal_id, &sig->query); + + sig->signal_id = signal_id; + sig->params = g_slice_alloc0 (sizeof(GValue) * (sig->query.n_params + 1)); + + g_value_init (&sig->params[0], G_TYPE_OBJECT); + g_value_set_object (&sig->params[0], agent); + + va_start (var_args, signal_id); + for (i = 0; i < sig->query.n_params; i++) { + G_VALUE_COLLECT_INIT (&sig->params[i + 1], sig->query.param_types[i], + var_args, 0, &error); + if (error) + break; + } + va_end (var_args); + + if (error) { + free_queued_signal (sig); + g_critical ("Error collecting values for signal: %s", error); + g_free (error); + return; + } + + g_queue_push_tail (&agent->pending_signals, sig); +} + + StunUsageIceCompatibility agent_to_ice_compatibility (NiceAgent *agent) { @@ -735,6 +813,8 @@ nice_agent_init (NiceAgent *agent) agent->rng = nice_rng_new (); priv_generate_tie_breaker (agent); + + g_queue_init (&agent->pending_signals); } @@ -854,7 +934,7 @@ nice_agent_get_property ( G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); } - agent_unlock(); + agent_unlock_and_emit(agent); } @@ -984,7 +1064,7 @@ nice_agent_set_property ( G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); } - agent_unlock(); + agent_unlock_and_emit (agent); } @@ -1025,7 +1105,8 @@ pseudo_tcp_socket_opened (PseudoTcpSocket *sock, gpointer user_data) nice_debug ("Agent %p: s%d:%d pseudo Tcp socket Opened", agent, stream->id, component->id); g_cancellable_cancel (component->tcp_writable_cancellable); - g_signal_emit (agent, signals[SIGNAL_RELIABLE_TRANSPORT_WRITABLE], 0, + + agent_queue_signal (agent, signals[SIGNAL_RELIABLE_TRANSPORT_WRITABLE], stream->id, component->id); } @@ -1283,7 +1364,7 @@ pseudo_tcp_socket_writable (PseudoTcpSocket *sock, gpointer user_data) nice_debug ("Agent %p: s%d:%d pseudo Tcp socket writable", agent, stream->id, component->id); g_cancellable_cancel (component->tcp_writable_cancellable); - g_signal_emit (agent, signals[SIGNAL_RELIABLE_TRANSPORT_WRITABLE], 0, + agent_queue_signal (agent, signals[SIGNAL_RELIABLE_TRANSPORT_WRITABLE], stream->id, component->id); } @@ -1356,7 +1437,7 @@ notify_pseudo_tcp_socket_clock (gpointer user_data) pseudo_tcp_socket_notify_clock (component->tcp); adjust_tcp_clock (agent, stream, component); - agent_unlock(); + agent_unlock_and_emit (agent); return G_SOURCE_CONTINUE; } @@ -1456,7 +1537,8 @@ void agent_signal_gathering_done (NiceAgent *agent) Stream *stream = i->data; if (stream->gathering) { stream->gathering = FALSE; - g_signal_emit (agent, signals[SIGNAL_CANDIDATE_GATHERING_DONE], 0, stream->id); + agent_queue_signal (agent, signals[SIGNAL_CANDIDATE_GATHERING_DONE], + stream->id); } } } @@ -1465,7 +1547,8 @@ void agent_signal_initial_binding_request_received (NiceAgent *agent, Stream *st { if (stream->initial_binding_request_received != TRUE) { stream->initial_binding_request_received = TRUE; - g_signal_emit (agent, signals[SIGNAL_INITIAL_BINDING_REQUEST_RECEIVED], 0, stream->id); + agent_queue_signal (agent, signals[SIGNAL_INITIAL_BINDING_REQUEST_RECEIVED], + stream->id); } } @@ -1551,7 +1634,7 @@ void agent_signal_new_selected_pair (NiceAgent *agent, guint stream_id, guint co lf_copy = g_strdup (local_foundation); rf_copy = g_strdup (remote_foundation); - g_signal_emit (agent, signals[SIGNAL_NEW_SELECTED_PAIR], 0, + agent_queue_signal (agent, signals[SIGNAL_NEW_SELECTED_PAIR], stream_id, component_id, lf_copy, rf_copy); g_free (lf_copy); @@ -1560,18 +1643,14 @@ void agent_signal_new_selected_pair (NiceAgent *agent, guint stream_id, guint co void agent_signal_new_candidate (NiceAgent *agent, NiceCandidate *candidate) { - g_signal_emit (agent, signals[SIGNAL_NEW_CANDIDATE], 0, - candidate->stream_id, - candidate->component_id, - candidate->foundation); + agent_queue_signal (agent, signals[SIGNAL_NEW_CANDIDATE], + candidate->stream_id, candidate->component_id, candidate->foundation); } void agent_signal_new_remote_candidate (NiceAgent *agent, NiceCandidate *candidate) { - g_signal_emit (agent, signals[SIGNAL_NEW_REMOTE_CANDIDATE], 0, - candidate->stream_id, - candidate->component_id, - candidate->foundation); + agent_queue_signal (agent, signals[SIGNAL_NEW_REMOTE_CANDIDATE], + candidate->stream_id, candidate->component_id, candidate->foundation); } static const gchar * @@ -1623,8 +1702,8 @@ void agent_signal_component_state_change (NiceAgent *agent, guint stream_id, gui process_queued_tcp_packets (agent, stream, component); - g_signal_emit (agent, signals[SIGNAL_COMPONENT_STATE_CHANGED], 0, - stream_id, component_id, state); + agent_queue_signal (agent, signals[SIGNAL_COMPONENT_STATE_CHANGED], + stream_id, component_id, state); } } @@ -1820,7 +1899,7 @@ nice_agent_add_stream ( ret = stream->id; - agent_unlock(); + agent_unlock_and_emit (agent); return ret; } @@ -1851,7 +1930,7 @@ nice_agent_set_relay_info(NiceAgent *agent, nice_address_set_port (&turn->server, server_port); } else { g_slice_free (TurnServer, turn); - agent_unlock(); + agent_unlock_and_emit (agent); return FALSE; } @@ -1866,7 +1945,7 @@ nice_agent_set_relay_info(NiceAgent *agent, component->turn_servers = g_list_append (component->turn_servers, turn); } - agent_unlock(); + agent_unlock_and_emit (agent); return TRUE; } @@ -1901,7 +1980,7 @@ static gboolean priv_upnp_timeout_cb (gpointer user_data) agent_gathering_done (agent); - agent_unlock(); + agent_unlock_and_emit (agent); return FALSE; } @@ -1967,7 +2046,7 @@ static void _upnp_mapped_external_port (GUPnPSimpleIgd *self, gchar *proto, agent_gathering_done (agent); } - agent_unlock(); + agent_unlock_and_emit (agent); } static void _upnp_error_mapping_port (GUPnPSimpleIgd *self, GError *error, @@ -2004,7 +2083,7 @@ static void _upnp_error_mapping_port (GUPnPSimpleIgd *self, GError *error, } } - agent_unlock(); + agent_unlock_and_emit (agent); } #endif @@ -2024,7 +2103,7 @@ nice_agent_gather_candidates ( stream = agent_find_stream (agent, stream_id); if (stream == NULL) { - agent_unlock(); + agent_unlock_and_emit (agent); return FALSE; } @@ -2229,7 +2308,7 @@ nice_agent_gather_candidates ( discovery_prune_stream (agent, stream_id); } - agent_unlock(); + agent_unlock_and_emit (agent); return ret; } @@ -2283,7 +2362,7 @@ nice_agent_remove_stream ( stream = agent_find_stream (agent, stream_id); if (!stream) { - agent_unlock (); + agent_unlock_and_emit (agent); return; } @@ -2299,10 +2378,9 @@ nice_agent_remove_stream ( if (!agent->streams) priv_remove_keepalive_timer (agent); - agent_unlock (); - - g_signal_emit (agent, signals[SIGNAL_STREAMS_REMOVED], 0, stream_ids); + agent_queue_signal (agent, signals[SIGNAL_STREAMS_REMOVED], stream_ids); + agent_unlock_and_emit (agent); return; } @@ -2319,7 +2397,7 @@ nice_agent_set_port_range (NiceAgent *agent, guint stream_id, guint component_id component->max_port = max_port; } - agent_unlock(); + agent_unlock_and_emit (agent); } NICEAPI_EXPORT gboolean @@ -2333,7 +2411,7 @@ nice_agent_add_local_address (NiceAgent *agent, NiceAddress *addr) nice_address_set_port (dup, 0); agent->local_addresses = g_slist_append (agent->local_addresses, dup); - agent_unlock(); + agent_unlock_and_emit (agent); return TRUE; } @@ -2466,7 +2544,7 @@ nice_agent_set_remote_credentials ( } done: - agent_unlock(); + agent_unlock_and_emit (agent); return ret; } @@ -2497,7 +2575,7 @@ nice_agent_get_local_credentials ( done: - agent_unlock(); + agent_unlock_and_emit (agent); return ret; } @@ -2571,7 +2649,7 @@ nice_agent_set_remote_candidates (NiceAgent *agent, guint stream_id, guint compo added = _set_remote_candidates_locked (agent, stream, component, candidates); done: - agent_unlock(); + agent_unlock_and_emit (agent); return added; } @@ -3204,10 +3282,18 @@ nice_agent_recv_messages_blocking_or_nonblocking (NiceAgent *agent, memcpy (&prev_recv_messages_iter, &component->recv_messages_iter, sizeof (NiceInputMessageIter)); - agent_unlock (); + + agent_unlock_and_emit (agent); g_main_context_iteration (context, blocking); agent_lock (); + if (!agent_find_component (agent, stream_id, component_id, + &stream, &component)) { + g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_BROKEN_PIPE, + "Component removed during call."); + goto done; + } + received_enough = nice_input_message_iter_is_at_end (&component->recv_messages_iter, component->recv_messages, component->n_recv_messages); @@ -3249,7 +3335,7 @@ done: if (child_error != NULL) g_propagate_error (error, child_error); - agent_unlock (); + agent_unlock_and_emit (agent); if (messages_orig) { for (i = 0; i < n_messages; i++) { @@ -3429,7 +3515,7 @@ done: if (child_error != NULL) g_propagate_error (error, child_error); - agent_unlock (); + agent_unlock_and_emit (agent); return n_sent; } @@ -3502,7 +3588,7 @@ nice_agent_get_local_candidates ( ret = g_slist_append (ret, nice_candidate_copy (item->data)); done: - agent_unlock(); + agent_unlock_and_emit (agent); return ret; } @@ -3526,7 +3612,7 @@ nice_agent_get_remote_candidates ( ret = g_slist_append (ret, nice_candidate_copy (item->data)); done: - agent_unlock(); + agent_unlock_and_emit (agent); return ret; } @@ -3554,7 +3640,7 @@ nice_agent_restart ( res = stream_restart (stream, agent->rng); } - agent_unlock(); + agent_unlock_and_emit (agent); return res; } @@ -3791,7 +3877,7 @@ component_io_cb (GSocket *socket, GIOCondition condition, gpointer user_data) done: g_object_unref (agent); - agent_unlock (); + agent_unlock_and_emit (agent); return !remove_source; } @@ -3840,7 +3926,7 @@ nice_agent_attach_recv ( } done: - agent_unlock(); + agent_unlock_and_emit (agent); return ret; } @@ -3888,7 +3974,7 @@ nice_agent_set_selected_pair ( ret = TRUE; done: - agent_unlock(); + agent_unlock_and_emit (agent); return ret; } @@ -3914,7 +4000,7 @@ nice_agent_get_selected_pair (NiceAgent *agent, guint stream_id, } done: - agent_unlock(); + agent_unlock_and_emit (agent); return ret; } @@ -3946,7 +4032,7 @@ nice_agent_get_selected_socket (NiceAgent *agent, guint stream_id, g_socket = g_object_ref (nice_socket->fileno); done: - agent_unlock(); + agent_unlock_and_emit (agent); return g_socket; } @@ -4013,7 +4099,7 @@ nice_agent_set_selected_remote_candidate ( ret = TRUE; done: - agent_unlock(); + agent_unlock_and_emit (agent); return ret; } @@ -4060,7 +4146,7 @@ nice_agent_set_stream_tos (NiceAgent *agent, } done: - agent_unlock(); + agent_unlock_and_emit (agent); } NICEAPI_EXPORT void @@ -4075,7 +4161,7 @@ nice_agent_set_software (NiceAgent *agent, const gchar *software) stun_agent_set_software (&agent->stun_agent, agent->software_attribute); - agent_unlock (); + agent_unlock_and_emit (agent); } NICEAPI_EXPORT gboolean @@ -4109,7 +4195,7 @@ nice_agent_set_stream_name (NiceAgent *agent, guint stream_id, ret = TRUE; done: - agent_unlock(); + agent_unlock_and_emit (agent); return ret; } @@ -4129,7 +4215,7 @@ nice_agent_get_stream_name (NiceAgent *agent, guint stream_id) name = stream->name; done: - agent_unlock(); + agent_unlock_and_emit (agent); return name; } @@ -4199,7 +4285,7 @@ nice_agent_get_default_local_candidate (NiceAgent *agent, default_candidate = nice_candidate_copy (default_candidate); done: - agent_unlock (); + agent_unlock_and_emit (agent); return default_candidate; } @@ -4311,7 +4397,7 @@ nice_agent_generate_local_sdp (NiceAgent *agent) _generate_stream_sdp (agent, stream, sdp, TRUE); } - agent_unlock(); + agent_unlock_and_emit (agent); return g_string_free (sdp, FALSE); } @@ -4335,7 +4421,7 @@ nice_agent_generate_local_stream_sdp (NiceAgent *agent, guint stream_id, ret = g_string_free (sdp, FALSE); done: - agent_unlock(); + agent_unlock_and_emit (agent); return ret; } @@ -4353,7 +4439,7 @@ nice_agent_generate_local_candidate_sdp (NiceAgent *agent, sdp = g_string_new (NULL); _generate_candidate_sdp (agent, candidate, sdp); - agent_unlock(); + agent_unlock_and_emit (agent); return g_string_free (sdp, FALSE); } @@ -4447,7 +4533,7 @@ nice_agent_parse_remote_sdp (NiceAgent *agent, const gchar *sdp) if (sdp_lines) g_strfreev(sdp_lines); - agent_unlock(); + agent_unlock_and_emit (agent); return ret; } @@ -4492,7 +4578,7 @@ nice_agent_parse_remote_stream_sdp (NiceAgent *agent, guint stream_id, if (sdp_lines) g_strfreev(sdp_lines); - agent_unlock(); + agent_unlock_and_emit (agent); return candidates; } @@ -4625,7 +4711,7 @@ nice_agent_get_io_stream (NiceAgent *agent, guint stream_id, iostream = g_object_ref (component->iostream); done: - agent_unlock (); + agent_unlock_and_emit (agent); return iostream; } diff --git a/agent/component.c b/agent/component.c index 29c134d..1ddd586 100644 --- a/agent/component.c +++ b/agent/component.c @@ -752,7 +752,7 @@ component_emit_io_callback (Component *component, * handler. */ if (g_main_context_is_owner (component->ctx)) { /* Thread owns the main context, so invoke the callback directly. */ - agent_unlock (); + agent_unlock_and_emit (agent); io_callback (agent, stream_id, component_id, buf_len, (gchar *) buf, io_user_data); agent_lock (); @@ -907,7 +907,7 @@ component_source_prepare (GSource *source, gint *timeout_) done: - agent_unlock (); + agent_unlock_and_emit (agent); /* We can’t be sure if the ComponentSource itself needs to be dispatched until * poll() is called on all the child sources. */ diff --git a/agent/conncheck.c b/agent/conncheck.c index 1ec0976..8577941 100644 --- a/agent/conncheck.c +++ b/agent/conncheck.c @@ -388,10 +388,9 @@ static gboolean priv_conn_check_tick_stream (Stream *stream, NiceAgent *agent, G * * @return will return FALSE when no more pending timers. */ -static gboolean priv_conn_check_tick_unlocked (gpointer pointer) +static gboolean priv_conn_check_tick_unlocked (NiceAgent *agent) { CandidateCheckPair *pair = NULL; - NiceAgent *agent = pointer; gboolean keep_timer_going = FALSE; GSList *i, *j; GTimeVal now; @@ -454,6 +453,7 @@ static gboolean priv_conn_check_tick_unlocked (gpointer pointer) static gboolean priv_conn_check_tick (gpointer pointer) { gboolean ret; + NiceAgent *agent = pointer; agent_lock(); if (g_source_is_destroyed (g_main_current_source ())) { @@ -462,8 +462,9 @@ static gboolean priv_conn_check_tick (gpointer pointer) agent_unlock (); return FALSE; } - ret = priv_conn_check_tick_unlocked (pointer); - agent_unlock(); + + ret = priv_conn_check_tick_unlocked (agent); + agent_unlock_and_emit (agent); return ret; } @@ -539,7 +540,7 @@ static gboolean priv_conn_keepalive_retransmissions_tick (gpointer pointer) } - agent_unlock (); + agent_unlock_and_emit (pair->keepalive.agent); return FALSE; } @@ -723,7 +724,7 @@ static gboolean priv_conn_keepalive_tick (gpointer pointer) agent->keepalive_timer_source = NULL; } } - agent_unlock(); + agent_unlock_and_emit (agent); return ret; } @@ -782,7 +783,7 @@ static gboolean priv_turn_allocate_refresh_retransmissions_tick (gpointer pointe } - agent_unlock (); + agent_unlock_and_emit (cand->agent); return FALSE; } @@ -867,7 +868,7 @@ static gboolean priv_turn_allocate_refresh_tick (gpointer pointer) } priv_turn_allocate_refresh_tick_unlocked (cand); - agent_unlock (); + agent_unlock_and_emit (cand->agent); return FALSE; } @@ -887,7 +888,7 @@ gboolean conn_check_schedule_next (NiceAgent *agent) nice_debug ("Agent %p : WARN: starting conn checks before local candidate gathering is finished.", agent); /* step: call once imediately */ - res = priv_conn_check_tick_unlocked ((gpointer) agent); + res = priv_conn_check_tick_unlocked (agent); nice_debug ("Agent %p : priv_conn_check_tick_unlocked returned %d", agent, res); /* step: schedule timer if not running yet */ diff --git a/agent/discovery.c b/agent/discovery.c index d4c2ab2..049aa56 100644 --- a/agent/discovery.c +++ b/agent/discovery.c @@ -1029,7 +1029,7 @@ static gboolean priv_discovery_tick (gpointer pointer) agent->discovery_timer_source = NULL; } } - agent_unlock(); + agent_unlock_and_emit (agent); return ret; } @@ -1045,7 +1045,7 @@ void discovery_schedule (NiceAgent *agent) g_assert (agent->discovery_list != NULL); if (agent->discovery_unsched_items > 0) { - + if (agent->discovery_timer_source == NULL) { /* step: run first iteration immediately */ gboolean res = priv_discovery_tick_unlocked (agent); |