diff options
author | Juan Navarro <juan.navarro@gmx.es> | 2018-08-20 18:01:02 +0200 |
---|---|---|
committer | Olivier Crête <olivier.crete@collabora.com> | 2018-10-28 14:47:32 +0000 |
commit | da41258a21102f63ec5d5b2dc20d303f772eb195 (patch) | |
tree | ad9165967d92e82b57678e1d9d967bfe9767fa98 /agent/agent.c | |
parent | 78bdcfad5738d21b200ec283918dfd93e17b3d85 (diff) | |
download | libnice-da41258a21102f63ec5d5b2dc20d303f772eb195.tar.gz |
Use per-agent locks and GWeakRefs in callbacks from timeout sources
Work on libnice's bug #1 in Gitlab. This work is composed of multiple
merged parts:
- "Global lock contention removed"
Phabricator D1900: https://phabricator.freedesktop.org/D1900
By @nifigase
Opened in GitLab as Merge Request !12
- "agent: properly handle NiceAgent ref in callbacks from timeout
sources"
Phabricator D1898: https://phabricator.freedesktop.org/D1898
By @mparis
This patch was itself based upon a previous version of the work done in
D1900. After the switch of hosting, it got lost.
On top of these, additions to follow some review comments from @ocrete:
- https://phabricator.freedesktop.org/D1900#40412
- https://phabricator.freedesktop.org/D1898#39332
Diffstat (limited to 'agent/agent.c')
-rw-r--r-- | agent/agent.c | 251 |
1 files changed, 161 insertions, 90 deletions
diff --git a/agent/agent.c b/agent/agent.c index db3067e..7f444b9 100644 --- a/agent/agent.c +++ b/agent/agent.c @@ -140,8 +140,6 @@ enum static guint signals[N_SIGNALS]; -static GMutex agent_mutex; /* Mutex used for thread-safe lib */ - static void priv_stop_upnp (NiceAgent *agent); static void pseudo_tcp_socket_opened (PseudoTcpSocket *sock, gpointer user_data); @@ -159,14 +157,14 @@ static void nice_agent_get_property (GObject *object, static void nice_agent_set_property (GObject *object, guint property_id, const GValue *value, GParamSpec *pspec); -void agent_lock (void) +void agent_lock (NiceAgent *agent) { - g_mutex_lock (&agent_mutex); + g_mutex_lock (&agent->agent_mutex); } -void agent_unlock (void) +void agent_unlock (NiceAgent *agent) { - g_mutex_unlock (&agent_mutex); + g_mutex_unlock (&agent->agent_mutex); } static GType _nice_agent_stream_ids_get_type (void); @@ -208,7 +206,7 @@ agent_unlock_and_emit (NiceAgent *agent) queue = agent->pending_signals; g_queue_init (&agent->pending_signals); - agent_unlock (); + agent_unlock (agent); while ((sig = g_queue_pop_head (&queue))) { g_signal_emitv (sig->params, sig->signal_id, 0, NULL); @@ -1185,6 +1183,8 @@ nice_agent_init (NiceAgent *agent) priv_generate_tie_breaker (agent); g_queue_init (&agent->pending_signals); + + g_mutex_init (&agent->agent_mutex); } @@ -1241,7 +1241,7 @@ nice_agent_get_property ( { NiceAgent *agent = NICE_AGENT (object); - agent_lock(); + agent_lock (agent); switch (property_id) { @@ -1441,7 +1441,7 @@ nice_agent_set_property ( { NiceAgent *agent = NICE_AGENT (object); - agent_lock(); + agent_lock (agent); switch (property_id) { @@ -1977,29 +1977,17 @@ pseudo_tcp_socket_write_packet (PseudoTcpSocket *psocket, static gboolean -notify_pseudo_tcp_socket_clock (gpointer user_data) +notify_pseudo_tcp_socket_clock_agent_locked (NiceAgent *agent, + gpointer user_data) { NiceComponent *component = user_data; NiceStream *stream; - NiceAgent *agent; - - agent_lock(); stream = component->stream; - agent = component->agent; - - if (g_source_is_destroyed (g_main_current_source ())) { - nice_debug ("Source was destroyed. " - "Avoided race condition in notify_pseudo_tcp_socket_clock"); - agent_unlock (); - return FALSE; - } pseudo_tcp_socket_notify_clock (component->tcp); adjust_tcp_clock (agent, stream, component); - agent_unlock_and_emit (agent); - return G_SOURCE_CONTINUE; } @@ -2023,7 +2011,7 @@ adjust_tcp_clock (NiceAgent *agent, NiceStream *stream, NiceComponent *component interval = G_MAXINT; agent_timeout_add_with_context (agent, &component->tcp_clock, "Pseudo-TCP clock", interval, - notify_pseudo_tcp_socket_clock, component); + notify_pseudo_tcp_socket_clock_agent_locked, component); } } } else { @@ -2042,13 +2030,13 @@ _tcp_sock_is_writable (NiceSocket *sock, gpointer user_data) NiceAgent *agent = component->agent; NiceStream *stream = component->stream; - agent_lock (); + agent_lock (agent); /* Don't signal writable if the socket that has become writable is not * the selected pair */ if (component->selected_pair.local == NULL || !nice_socket_is_based_on (component->selected_pair.local->sockptr, sock)) { - agent_unlock (); + agent_unlock (agent); return; } @@ -2614,7 +2602,7 @@ nice_agent_add_stream ( g_return_val_if_fail (NICE_IS_AGENT (agent), 0); g_return_val_if_fail (n_components >= 1, 0); - agent_lock(); + agent_lock (agent); stream = nice_stream_new (n_components, agent); agent->streams = g_slist_append (agent->streams, stream); @@ -2663,7 +2651,7 @@ nice_agent_set_relay_info(NiceAgent *agent, g_return_val_if_fail (password, FALSE); g_return_val_if_fail (type <= NICE_RELAY_TYPE_TURN_TLS, FALSE); - agent_lock(); + agent_lock (agent); if (!agent_find_component (agent, stream_id, component_id, &stream, &component)) { @@ -2716,18 +2704,9 @@ nice_agent_set_relay_info(NiceAgent *agent, static void agent_check_upnp_gathering_done (NiceAgent *agent); -static gboolean priv_upnp_timeout_cb (gpointer user_data) +static gboolean priv_upnp_timeout_cb_agent_locked (NiceAgent *agent, + gpointer user_data) { - NiceAgent *agent = (NiceAgent*)user_data; - - agent_lock(); - - /* If the source has been destroyed, we have already freed all mappings. */ - if (g_source_is_destroyed (g_main_current_source ())) { - agent_unlock (); - return FALSE; - } - nice_debug ("Agent %p : UPnP port mapping timed out", agent); /* We cannot free priv->upnp here as it may be holding mappings open which @@ -2737,7 +2716,6 @@ static gboolean priv_upnp_timeout_cb (gpointer user_data) agent_check_upnp_gathering_done (agent); - agent_unlock_and_emit (agent); return FALSE; } @@ -2772,7 +2750,7 @@ static void _upnp_mapped_external_port (GUPnPSimpleIgd *self, gchar *proto, NiceCandidateTransport transport; GSList *i, *j, *k; - agent_lock(); + agent_lock (agent); if (agent->upnp_timer_source == NULL) goto end; @@ -2842,7 +2820,7 @@ static void _upnp_error_mapping_port (GUPnPSimpleIgd *self, GError *error, NiceAddress localaddr; GSList *i; - agent_lock(); + agent_lock (agent); nice_debug ("Agent %p : Error mapping %s:%d to %d (%d) : %s", agent, local_ip, local_port, external_port, error->domain, error->message); @@ -2880,7 +2858,7 @@ nice_agent_gather_candidates ( g_return_val_if_fail (NICE_IS_AGENT (agent), FALSE); g_return_val_if_fail (stream_id >= 1, FALSE); - agent_lock(); + agent_lock (agent); stream = agent_find_stream (agent, stream_id); if (stream == NULL) { @@ -3047,7 +3025,8 @@ nice_agent_gather_candidates ( agent->upnp_mapping = g_slist_prepend (agent->upnp_mapping, base_addr); agent_timeout_add_with_context (agent, &agent->upnp_timer_source, - "UPnP timeout", agent->upnp_timeout, priv_upnp_timeout_cb, agent); + "UPnP timeout", agent->upnp_timeout, + priv_upnp_timeout_cb_agent_locked, agent); } #endif @@ -3221,7 +3200,7 @@ nice_agent_remove_stream ( g_return_if_fail (NICE_IS_AGENT (agent)); g_return_if_fail (stream_id >= 1); - agent_lock(); + agent_lock (agent); stream = agent_find_stream (agent, stream_id); if (!stream) { @@ -3265,7 +3244,7 @@ nice_agent_set_port_range (NiceAgent *agent, guint stream_id, guint component_id g_return_if_fail (stream_id >= 1); g_return_if_fail (component_id >= 1); - agent_lock(); + agent_lock (agent); if (agent_find_component (agent, stream_id, component_id, &stream, &component)) { @@ -3288,7 +3267,7 @@ nice_agent_add_local_address (NiceAgent *agent, NiceAddress *addr) g_return_val_if_fail (NICE_IS_AGENT (agent), FALSE); g_return_val_if_fail (addr != NULL, FALSE); - agent_lock(); + agent_lock (agent); dupaddr = nice_address_dup (addr); nice_address_set_port (dupaddr, 0); @@ -3489,7 +3468,7 @@ nice_agent_set_remote_credentials ( g_return_val_if_fail (NICE_IS_AGENT (agent), FALSE); g_return_val_if_fail (stream_id >= 1, FALSE); - agent_lock(); + agent_lock (agent); stream = agent_find_stream (agent, stream_id); /* note: oddly enough, ufrag and pwd can be empty strings */ @@ -3522,7 +3501,7 @@ nice_agent_set_local_credentials ( g_return_val_if_fail (NICE_IS_AGENT (agent), FALSE); g_return_val_if_fail (stream_id >= 1, FALSE); - agent_lock (); + agent_lock (agent); stream = agent_find_stream (agent, stream_id); @@ -3553,7 +3532,7 @@ nice_agent_get_local_credentials ( g_return_val_if_fail (NICE_IS_AGENT (agent), FALSE); g_return_val_if_fail (stream_id >= 1, FALSE); - agent_lock(); + agent_lock (agent); stream = agent_find_stream (agent, stream_id); if (stream == NULL) { @@ -3623,7 +3602,7 @@ nice_agent_set_remote_candidates (NiceAgent *agent, guint stream_id, guint compo nice_debug ("Agent %p: set_remote_candidates %d %d", agent, stream_id, component_id); - agent_lock(); + agent_lock (agent); if (!agent_find_component (agent, stream_id, component_id, &stream, &component)) { @@ -4422,7 +4401,7 @@ nice_agent_recv_messages_blocking_or_nonblocking (NiceAgent *agent, } } - agent_lock (); + agent_lock (agent); if (!agent_find_component (agent, stream_id, component_id, &stream, &component)) { @@ -4516,9 +4495,9 @@ nice_agent_recv_messages_blocking_or_nonblocking (NiceAgent *agent, memcpy (&prev_recv_messages_iter, &component->recv_messages_iter, sizeof (NiceInputMessageIter)); - agent_unlock (); + agent_unlock (agent); g_main_context_iteration (context, blocking); - agent_lock (); + agent_lock (agent); if (!agent_find_component (agent, stream_id, component_id, &stream, &component)) { @@ -4709,7 +4688,7 @@ nice_agent_send_messages_nonblocking_internal ( g_assert (n_messages == 1 || !allow_partial); - agent_lock (); + agent_lock (agent); if (!agent_find_component (agent, stream_id, component_id, &stream, &component)) { @@ -4956,7 +4935,7 @@ nice_agent_get_local_candidates ( g_return_val_if_fail (stream_id >= 1, NULL); g_return_val_if_fail (component_id >= 1, NULL); - agent_lock(); + agent_lock (agent); if (!agent_find_component (agent, stream_id, component_id, NULL, &component)) { goto done; @@ -4990,7 +4969,7 @@ nice_agent_get_remote_candidates ( g_return_val_if_fail (stream_id >= 1, NULL); g_return_val_if_fail (component_id >= 1, NULL); - agent_lock(); + agent_lock (agent); if (!agent_find_component (agent, stream_id, component_id, NULL, &component)) { goto done; @@ -5010,7 +4989,7 @@ nice_agent_restart ( { GSList *i; - agent_lock(); + agent_lock (agent); /* step: regenerate tie-breaker value */ priv_generate_tie_breaker (agent); @@ -5040,7 +5019,7 @@ nice_agent_restart_stream ( gboolean res = FALSE; NiceStream *stream; - agent_lock(); + agent_lock (agent); stream = agent_find_stream (agent, stream_id); if (!stream) { @@ -5066,6 +5045,8 @@ nice_agent_dispose (GObject *object) QueuedSignal *sig; NiceAgent *agent = NICE_AGENT (object); + agent_lock (agent); + /* step: free resources for the binding discovery timers */ discovery_free (agent); g_assert (agent->discovery_list == NULL); @@ -5131,6 +5112,10 @@ nice_agent_dispose (GObject *object) g_main_context_unref (agent->main_context); agent->main_context = NULL; + agent_unlock (agent); + + g_mutex_clear (&agent->agent_mutex); + if (G_OBJECT_CLASS (nice_agent_parent_class)->dispose) G_OBJECT_CLASS (nice_agent_parent_class)->dispose (object); @@ -5146,20 +5131,20 @@ component_io_cb (GSocket *gsocket, GIOCondition condition, gpointer user_data) gboolean has_io_callback; gboolean remove_source = FALSE; - agent_lock (); + component = socket_source->component; + agent = component->agent; + stream = component->stream; + + agent_lock (agent); if (g_source_is_destroyed (g_main_current_source ())) { /* Silently return FALSE. */ nice_debug ("%s: source %p destroyed", G_STRFUNC, g_main_current_source ()); - agent_unlock (); + agent_unlock (agent); return G_SOURCE_REMOVE; } - component = socket_source->component; - agent = component->agent; - stream = component->stream; - g_object_ref (agent); /* Remove disconnected sockets when we get a HUP */ @@ -5176,7 +5161,7 @@ component_io_cb (GSocket *gsocket, GIOCondition condition, gpointer user_data) } nice_component_remove_socket (component, socket_source->socket); - agent_unlock (); + agent_unlock (agent); g_object_unref (agent); return G_SOURCE_REMOVE; } @@ -5349,7 +5334,7 @@ done: if (component->n_recv_messages == 0 && component->recv_messages == NULL) { agent_unlock_and_emit (agent); } else { - agent_unlock (); + agent_unlock (agent); } g_object_unref (agent); @@ -5380,7 +5365,7 @@ nice_agent_attach_recv ( g_return_val_if_fail (stream_id >= 1, FALSE); g_return_val_if_fail (component_id >= 1, FALSE); - agent_lock(); + agent_lock (agent); /* attach candidates */ @@ -5435,7 +5420,7 @@ nice_agent_set_selected_pair ( g_return_val_if_fail (lfoundation, FALSE); g_return_val_if_fail (rfoundation, FALSE); - agent_lock(); + agent_lock (agent); /* step: check that params specify an existing pair */ if (!agent_find_component (agent, stream_id, component_id, &stream, &component)) { @@ -5497,7 +5482,7 @@ nice_agent_get_selected_pair (NiceAgent *agent, guint stream_id, g_return_val_if_fail (local != NULL, FALSE); g_return_val_if_fail (remote != NULL, FALSE); - agent_lock(); + agent_lock (agent); /* step: check that params specify an existing pair */ if (!agent_find_component (agent, stream_id, component_id, @@ -5529,7 +5514,7 @@ nice_agent_get_selected_socket (NiceAgent *agent, guint stream_id, g_return_val_if_fail (stream_id >= 1, NULL); g_return_val_if_fail (component_id >= 1, NULL); - agent_lock(); + agent_lock (agent); /* Reliable streams are pseudotcp or MUST use RFC 4571 framing */ if (agent->reliable) @@ -5560,16 +5545,82 @@ nice_agent_get_selected_socket (NiceAgent *agent, guint stream_id, return g_socket; } +typedef struct _TimeoutData +{ + GWeakRef/*<NiceAgent>*/ agent_ref; + NiceTimeoutLockedCallback function; + gpointer user_data; +} TimeoutData; + +static void +timeout_data_destroy (TimeoutData *data) +{ + g_weak_ref_clear (&data->agent_ref); + g_slice_free (TimeoutData, data); +} + +static TimeoutData * +timeout_data_new (NiceAgent *agent, NiceTimeoutLockedCallback function, + gpointer user_data) +{ + TimeoutData *data = g_slice_new0 (TimeoutData); + + g_weak_ref_init (&data->agent_ref, agent); + data->function = function; + data->user_data = user_data; + + return data; +} + +static gboolean +timeout_cb (gpointer user_data) +{ + TimeoutData *data = user_data; + NiceAgent *agent; + gboolean ret = G_SOURCE_REMOVE; + + agent = g_weak_ref_get (&data->agent_ref); + if (agent == NULL) { + return G_SOURCE_REMOVE; + } + + agent_lock (agent); + + /* A race condition might happen where the mutex above waits for the lock + * and in the meantime another thread destroys the source. + * In that case, we don't need to run the function since it should + * have been cancelled */ + if (g_source_is_destroyed (g_main_current_source ())) { + nice_debug ("Source was destroyed. Avoided race condition in timeout_cb"); + + agent_unlock (agent); + goto end; + } + + ret = data->function (agent, data->user_data); + + agent_unlock_and_emit (agent); + + end: + g_object_unref (agent); + + return ret; +} + /* Create a new timer GSource with the given @name, @interval, callback * @function and @data, and assign it to @out, destroying and freeing any * existing #GSource in @out first. * * This guarantees that a timer won’t be overwritten without being destroyed. + * + * @interval is given in milliseconds. */ -void agent_timeout_add_with_context (NiceAgent *agent, GSource **out, - const gchar *name, guint interval, GSourceFunc function, gpointer data) +static void agent_timeout_add_with_context_internal (NiceAgent *agent, + GSource **out, const gchar *name, guint interval, gboolean seconds, + NiceTimeoutLockedCallback function, gpointer user_data) { GSource *source; + TimeoutData *data; g_return_if_fail (function != NULL); g_return_if_fail (out != NULL); @@ -5582,16 +5633,36 @@ void agent_timeout_add_with_context (NiceAgent *agent, GSource **out, } /* Create the new source. */ - source = g_timeout_source_new (interval); + if (seconds) + source = g_timeout_source_new_seconds (interval); + else + source = g_timeout_source_new (interval); g_source_set_name (source, name); - g_source_set_callback (source, function, data, NULL); + data = timeout_data_new (agent, function, user_data); + g_source_set_callback (source, timeout_cb, data, + (GDestroyNotify)timeout_data_destroy); g_source_attach (source, agent->main_context); /* Return it! */ *out = source; } +void agent_timeout_add_with_context (NiceAgent *agent, + GSource **out, const gchar *name, guint interval, + NiceTimeoutLockedCallback function, gpointer user_data) +{ + agent_timeout_add_with_context_internal (agent, out, name, interval, FALSE, + function, user_data); +} + +void agent_timeout_add_seconds_with_context (NiceAgent *agent, + GSource **out, const gchar *name, guint interval, + NiceTimeoutLockedCallback function, gpointer user_data) +{ + agent_timeout_add_with_context_internal (agent, out, name, interval, TRUE, + function, user_data); +} NICEAPI_EXPORT gboolean nice_agent_set_selected_remote_candidate ( @@ -5612,7 +5683,7 @@ nice_agent_set_selected_remote_candidate ( g_return_val_if_fail (component_id != 0, FALSE); g_return_val_if_fail (candidate != NULL, FALSE); - agent_lock(); + agent_lock (agent); /* step: check if the component exists*/ if (!agent_find_component (agent, stream_id, component_id, &stream, &component)) { @@ -5701,7 +5772,7 @@ nice_agent_set_stream_tos (NiceAgent *agent, g_return_if_fail (NICE_IS_AGENT (agent)); g_return_if_fail (stream_id >= 1); - agent_lock(); + agent_lock (agent); stream = agent_find_stream (agent, stream_id); if (stream == NULL) @@ -5727,7 +5798,7 @@ nice_agent_set_software (NiceAgent *agent, const gchar *software) { g_return_if_fail (NICE_IS_AGENT (agent)); - agent_lock(); + agent_lock (agent); g_free (agent->software_attribute); if (software) @@ -5762,7 +5833,7 @@ nice_agent_set_stream_name (NiceAgent *agent, guint stream_id, " are valid", name); } - agent_lock(); + agent_lock (agent); for (i = agent->streams; i; i = i->next) { NiceStream *stream = i->data; @@ -5797,7 +5868,7 @@ nice_agent_get_stream_name (NiceAgent *agent, guint stream_id) g_return_val_if_fail (NICE_IS_AGENT (agent), NULL); g_return_val_if_fail (stream_id >= 1, NULL); - agent_lock(); + agent_lock (agent); stream = agent_find_stream (agent, stream_id); if (stream == NULL) @@ -5871,7 +5942,7 @@ nice_agent_get_default_local_candidate (NiceAgent *agent, g_return_val_if_fail (stream_id >= 1, NULL); g_return_val_if_fail (component_id >= 1, NULL); - agent_lock (); + agent_lock (agent); /* step: check if the component exists*/ if (!agent_find_component (agent, stream_id, component_id, @@ -6030,7 +6101,7 @@ nice_agent_generate_local_sdp (NiceAgent *agent) g_return_val_if_fail (NICE_IS_AGENT (agent), NULL); - agent_lock(); + agent_lock (agent); for (i = agent->streams; i; i = i->next) { NiceStream *stream = i->data; @@ -6054,7 +6125,7 @@ nice_agent_generate_local_stream_sdp (NiceAgent *agent, guint stream_id, g_return_val_if_fail (NICE_IS_AGENT (agent), NULL); g_return_val_if_fail (stream_id >= 1, NULL); - agent_lock(); + agent_lock (agent); stream = agent_find_stream (agent, stream_id); if (stream == NULL) @@ -6079,7 +6150,7 @@ nice_agent_generate_local_candidate_sdp (NiceAgent *agent, g_return_val_if_fail (NICE_IS_AGENT (agent), NULL); g_return_val_if_fail (candidate != NULL, NULL); - agent_lock(); + agent_lock (agent); sdp = g_string_new (NULL); _generate_candidate_sdp (agent, candidate, sdp); @@ -6101,7 +6172,7 @@ nice_agent_parse_remote_sdp (NiceAgent *agent, const gchar *sdp) g_return_val_if_fail (NICE_IS_AGENT (agent), -1); g_return_val_if_fail (sdp != NULL, -1); - agent_lock(); + agent_lock (agent); for (l = agent->streams; l; l = l->next) { NiceStream *stream = l->data; @@ -6193,7 +6264,7 @@ nice_agent_parse_remote_stream_sdp (NiceAgent *agent, guint stream_id, g_return_val_if_fail (stream_id >= 1, NULL); g_return_val_if_fail (sdp != NULL, NULL); - agent_lock(); + agent_lock (agent); stream = agent_find_stream (agent, stream_id); if (stream == NULL) { @@ -6372,7 +6443,7 @@ nice_agent_get_io_stream (NiceAgent *agent, guint stream_id, g_return_val_if_fail (agent->reliable, NULL); - agent_lock (); + agent_lock (agent); if (!agent_find_component (agent, stream_id, component_id, NULL, &component)) goto done; @@ -6398,7 +6469,7 @@ nice_agent_forget_relays (NiceAgent *agent, guint stream_id, guint component_id) g_return_val_if_fail (stream_id >= 1, FALSE); g_return_val_if_fail (component_id >= 1, FALSE); - agent_lock (); + agent_lock (agent); if (!agent_find_component (agent, stream_id, component_id, NULL, &component)) { ret = FALSE; @@ -6455,12 +6526,12 @@ nice_agent_get_component_state (NiceAgent *agent, NiceComponentState state = NICE_COMPONENT_STATE_FAILED; NiceComponent *component; - agent_lock (); + agent_lock (agent); if (agent_find_component (agent, stream_id, component_id, NULL, &component)) state = component->state; - agent_unlock (); + agent_unlock (agent); return state; } |