diff options
author | Juan Navarro <juan.navarro@gmx.es> | 2018-08-20 18:01:02 +0200 |
---|---|---|
committer | Olivier Crête <olivier.crete@collabora.com> | 2018-10-28 14:47:32 +0000 |
commit | da41258a21102f63ec5d5b2dc20d303f772eb195 (patch) | |
tree | ad9165967d92e82b57678e1d9d967bfe9767fa98 | |
parent | 78bdcfad5738d21b200ec283918dfd93e17b3d85 (diff) | |
download | libnice-da41258a21102f63ec5d5b2dc20d303f772eb195.tar.gz |
Use per-agent locks and GWeakRefs in callbacks from timeout sources
Work on libnice's bug #1 in Gitlab. This work is composed of multiple
merged parts:
- "Global lock contention removed"
Phabricator D1900: https://phabricator.freedesktop.org/D1900
By @nifigase
Opened in GitLab as Merge Request !12
- "agent: properly handle NiceAgent ref in callbacks from timeout
sources"
Phabricator D1898: https://phabricator.freedesktop.org/D1898
By @mparis
This patch was itself based upon a previous version of the work done in
D1900. After the switch of hosting, it got lost.
On top of these, additions to follow some review comments from @ocrete:
- https://phabricator.freedesktop.org/D1900#40412
- https://phabricator.freedesktop.org/D1898#39332
-rw-r--r-- | agent/agent-priv.h | 14 | ||||
-rw-r--r-- | agent/agent.c | 251 | ||||
-rw-r--r-- | agent/component.c | 4 | ||||
-rw-r--r-- | agent/conncheck.c | 192 | ||||
-rw-r--r-- | agent/conncheck.h | 1 | ||||
-rw-r--r-- | agent/discovery.c | 20 | ||||
-rw-r--r-- | agent/inputstream.c | 8 | ||||
-rw-r--r-- | agent/outputstream.c | 12 | ||||
-rw-r--r-- | socket/tcp-bsd.c | 12 | ||||
-rw-r--r-- | socket/udp-turn.c | 92 |
10 files changed, 281 insertions, 325 deletions
diff --git a/agent/agent-priv.h b/agent/agent-priv.h index 7269be0..bf67eb5 100644 --- a/agent/agent-priv.h +++ b/agent/agent-priv.h @@ -138,6 +138,8 @@ struct _NiceAgent { GObject parent; /* gobject pointer */ + GMutex agent_mutex; /* Mutex used for thread-safe lib */ + gboolean full_mode; /* property: full-mode */ gchar *stun_server_ip; /* property: STUN server IP */ guint stun_server_port; /* property: STUN server port */ @@ -208,8 +210,8 @@ NiceStream *agent_find_stream (NiceAgent *agent, guint stream_id); void agent_gathering_done (NiceAgent *agent); void agent_signal_gathering_done (NiceAgent *agent); -void agent_lock (void); -void agent_unlock (void); +void agent_lock (NiceAgent *agent); +void agent_unlock (NiceAgent *agent); void agent_unlock_and_emit (NiceAgent *agent); void agent_signal_new_selected_pair ( @@ -235,8 +237,14 @@ void agent_signal_initial_binding_request_received (NiceAgent *agent, NiceStream guint64 agent_candidate_pair_priority (NiceAgent *agent, NiceCandidate *local, NiceCandidate *remote); +typedef gboolean (*NiceTimeoutLockedCallback)(NiceAgent *agent, + gpointer user_data); void agent_timeout_add_with_context (NiceAgent *agent, GSource **out, - const gchar *name, guint interval, GSourceFunc function, gpointer data); + const gchar *name, guint interval, NiceTimeoutLockedCallback function, + gpointer data); +void agent_timeout_add_seconds_with_context (NiceAgent *agent, GSource **out, + const gchar *name, guint interval, NiceTimeoutLockedCallback function, + gpointer data); StunUsageIceCompatibility agent_to_ice_compatibility (NiceAgent *agent); StunUsageTurnCompatibility agent_to_turn_compatibility (NiceAgent *agent); diff --git a/agent/agent.c b/agent/agent.c index db3067e..7f444b9 100644 --- a/agent/agent.c +++ b/agent/agent.c @@ -140,8 +140,6 @@ enum static guint signals[N_SIGNALS]; -static GMutex agent_mutex; /* Mutex used for thread-safe lib */ - static void priv_stop_upnp (NiceAgent *agent); static void pseudo_tcp_socket_opened (PseudoTcpSocket *sock, gpointer user_data); @@ -159,14 +157,14 @@ static void nice_agent_get_property (GObject *object, static void nice_agent_set_property (GObject *object, guint property_id, const GValue *value, GParamSpec *pspec); -void agent_lock (void) +void agent_lock (NiceAgent *agent) { - g_mutex_lock (&agent_mutex); + g_mutex_lock (&agent->agent_mutex); } -void agent_unlock (void) +void agent_unlock (NiceAgent *agent) { - g_mutex_unlock (&agent_mutex); + g_mutex_unlock (&agent->agent_mutex); } static GType _nice_agent_stream_ids_get_type (void); @@ -208,7 +206,7 @@ agent_unlock_and_emit (NiceAgent *agent) queue = agent->pending_signals; g_queue_init (&agent->pending_signals); - agent_unlock (); + agent_unlock (agent); while ((sig = g_queue_pop_head (&queue))) { g_signal_emitv (sig->params, sig->signal_id, 0, NULL); @@ -1185,6 +1183,8 @@ nice_agent_init (NiceAgent *agent) priv_generate_tie_breaker (agent); g_queue_init (&agent->pending_signals); + + g_mutex_init (&agent->agent_mutex); } @@ -1241,7 +1241,7 @@ nice_agent_get_property ( { NiceAgent *agent = NICE_AGENT (object); - agent_lock(); + agent_lock (agent); switch (property_id) { @@ -1441,7 +1441,7 @@ nice_agent_set_property ( { NiceAgent *agent = NICE_AGENT (object); - agent_lock(); + agent_lock (agent); switch (property_id) { @@ -1977,29 +1977,17 @@ pseudo_tcp_socket_write_packet (PseudoTcpSocket *psocket, static gboolean -notify_pseudo_tcp_socket_clock (gpointer user_data) +notify_pseudo_tcp_socket_clock_agent_locked (NiceAgent *agent, + gpointer user_data) { NiceComponent *component = user_data; NiceStream *stream; - NiceAgent *agent; - - agent_lock(); stream = component->stream; - agent = component->agent; - - if (g_source_is_destroyed (g_main_current_source ())) { - nice_debug ("Source was destroyed. " - "Avoided race condition in notify_pseudo_tcp_socket_clock"); - agent_unlock (); - return FALSE; - } pseudo_tcp_socket_notify_clock (component->tcp); adjust_tcp_clock (agent, stream, component); - agent_unlock_and_emit (agent); - return G_SOURCE_CONTINUE; } @@ -2023,7 +2011,7 @@ adjust_tcp_clock (NiceAgent *agent, NiceStream *stream, NiceComponent *component interval = G_MAXINT; agent_timeout_add_with_context (agent, &component->tcp_clock, "Pseudo-TCP clock", interval, - notify_pseudo_tcp_socket_clock, component); + notify_pseudo_tcp_socket_clock_agent_locked, component); } } } else { @@ -2042,13 +2030,13 @@ _tcp_sock_is_writable (NiceSocket *sock, gpointer user_data) NiceAgent *agent = component->agent; NiceStream *stream = component->stream; - agent_lock (); + agent_lock (agent); /* Don't signal writable if the socket that has become writable is not * the selected pair */ if (component->selected_pair.local == NULL || !nice_socket_is_based_on (component->selected_pair.local->sockptr, sock)) { - agent_unlock (); + agent_unlock (agent); return; } @@ -2614,7 +2602,7 @@ nice_agent_add_stream ( g_return_val_if_fail (NICE_IS_AGENT (agent), 0); g_return_val_if_fail (n_components >= 1, 0); - agent_lock(); + agent_lock (agent); stream = nice_stream_new (n_components, agent); agent->streams = g_slist_append (agent->streams, stream); @@ -2663,7 +2651,7 @@ nice_agent_set_relay_info(NiceAgent *agent, g_return_val_if_fail (password, FALSE); g_return_val_if_fail (type <= NICE_RELAY_TYPE_TURN_TLS, FALSE); - agent_lock(); + agent_lock (agent); if (!agent_find_component (agent, stream_id, component_id, &stream, &component)) { @@ -2716,18 +2704,9 @@ nice_agent_set_relay_info(NiceAgent *agent, static void agent_check_upnp_gathering_done (NiceAgent *agent); -static gboolean priv_upnp_timeout_cb (gpointer user_data) +static gboolean priv_upnp_timeout_cb_agent_locked (NiceAgent *agent, + gpointer user_data) { - NiceAgent *agent = (NiceAgent*)user_data; - - agent_lock(); - - /* If the source has been destroyed, we have already freed all mappings. */ - if (g_source_is_destroyed (g_main_current_source ())) { - agent_unlock (); - return FALSE; - } - nice_debug ("Agent %p : UPnP port mapping timed out", agent); /* We cannot free priv->upnp here as it may be holding mappings open which @@ -2737,7 +2716,6 @@ static gboolean priv_upnp_timeout_cb (gpointer user_data) agent_check_upnp_gathering_done (agent); - agent_unlock_and_emit (agent); return FALSE; } @@ -2772,7 +2750,7 @@ static void _upnp_mapped_external_port (GUPnPSimpleIgd *self, gchar *proto, NiceCandidateTransport transport; GSList *i, *j, *k; - agent_lock(); + agent_lock (agent); if (agent->upnp_timer_source == NULL) goto end; @@ -2842,7 +2820,7 @@ static void _upnp_error_mapping_port (GUPnPSimpleIgd *self, GError *error, NiceAddress localaddr; GSList *i; - agent_lock(); + agent_lock (agent); nice_debug ("Agent %p : Error mapping %s:%d to %d (%d) : %s", agent, local_ip, local_port, external_port, error->domain, error->message); @@ -2880,7 +2858,7 @@ nice_agent_gather_candidates ( g_return_val_if_fail (NICE_IS_AGENT (agent), FALSE); g_return_val_if_fail (stream_id >= 1, FALSE); - agent_lock(); + agent_lock (agent); stream = agent_find_stream (agent, stream_id); if (stream == NULL) { @@ -3047,7 +3025,8 @@ nice_agent_gather_candidates ( agent->upnp_mapping = g_slist_prepend (agent->upnp_mapping, base_addr); agent_timeout_add_with_context (agent, &agent->upnp_timer_source, - "UPnP timeout", agent->upnp_timeout, priv_upnp_timeout_cb, agent); + "UPnP timeout", agent->upnp_timeout, + priv_upnp_timeout_cb_agent_locked, agent); } #endif @@ -3221,7 +3200,7 @@ nice_agent_remove_stream ( g_return_if_fail (NICE_IS_AGENT (agent)); g_return_if_fail (stream_id >= 1); - agent_lock(); + agent_lock (agent); stream = agent_find_stream (agent, stream_id); if (!stream) { @@ -3265,7 +3244,7 @@ nice_agent_set_port_range (NiceAgent *agent, guint stream_id, guint component_id g_return_if_fail (stream_id >= 1); g_return_if_fail (component_id >= 1); - agent_lock(); + agent_lock (agent); if (agent_find_component (agent, stream_id, component_id, &stream, &component)) { @@ -3288,7 +3267,7 @@ nice_agent_add_local_address (NiceAgent *agent, NiceAddress *addr) g_return_val_if_fail (NICE_IS_AGENT (agent), FALSE); g_return_val_if_fail (addr != NULL, FALSE); - agent_lock(); + agent_lock (agent); dupaddr = nice_address_dup (addr); nice_address_set_port (dupaddr, 0); @@ -3489,7 +3468,7 @@ nice_agent_set_remote_credentials ( g_return_val_if_fail (NICE_IS_AGENT (agent), FALSE); g_return_val_if_fail (stream_id >= 1, FALSE); - agent_lock(); + agent_lock (agent); stream = agent_find_stream (agent, stream_id); /* note: oddly enough, ufrag and pwd can be empty strings */ @@ -3522,7 +3501,7 @@ nice_agent_set_local_credentials ( g_return_val_if_fail (NICE_IS_AGENT (agent), FALSE); g_return_val_if_fail (stream_id >= 1, FALSE); - agent_lock (); + agent_lock (agent); stream = agent_find_stream (agent, stream_id); @@ -3553,7 +3532,7 @@ nice_agent_get_local_credentials ( g_return_val_if_fail (NICE_IS_AGENT (agent), FALSE); g_return_val_if_fail (stream_id >= 1, FALSE); - agent_lock(); + agent_lock (agent); stream = agent_find_stream (agent, stream_id); if (stream == NULL) { @@ -3623,7 +3602,7 @@ nice_agent_set_remote_candidates (NiceAgent *agent, guint stream_id, guint compo nice_debug ("Agent %p: set_remote_candidates %d %d", agent, stream_id, component_id); - agent_lock(); + agent_lock (agent); if (!agent_find_component (agent, stream_id, component_id, &stream, &component)) { @@ -4422,7 +4401,7 @@ nice_agent_recv_messages_blocking_or_nonblocking (NiceAgent *agent, } } - agent_lock (); + agent_lock (agent); if (!agent_find_component (agent, stream_id, component_id, &stream, &component)) { @@ -4516,9 +4495,9 @@ nice_agent_recv_messages_blocking_or_nonblocking (NiceAgent *agent, memcpy (&prev_recv_messages_iter, &component->recv_messages_iter, sizeof (NiceInputMessageIter)); - agent_unlock (); + agent_unlock (agent); g_main_context_iteration (context, blocking); - agent_lock (); + agent_lock (agent); if (!agent_find_component (agent, stream_id, component_id, &stream, &component)) { @@ -4709,7 +4688,7 @@ nice_agent_send_messages_nonblocking_internal ( g_assert (n_messages == 1 || !allow_partial); - agent_lock (); + agent_lock (agent); if (!agent_find_component (agent, stream_id, component_id, &stream, &component)) { @@ -4956,7 +4935,7 @@ nice_agent_get_local_candidates ( g_return_val_if_fail (stream_id >= 1, NULL); g_return_val_if_fail (component_id >= 1, NULL); - agent_lock(); + agent_lock (agent); if (!agent_find_component (agent, stream_id, component_id, NULL, &component)) { goto done; @@ -4990,7 +4969,7 @@ nice_agent_get_remote_candidates ( g_return_val_if_fail (stream_id >= 1, NULL); g_return_val_if_fail (component_id >= 1, NULL); - agent_lock(); + agent_lock (agent); if (!agent_find_component (agent, stream_id, component_id, NULL, &component)) { goto done; @@ -5010,7 +4989,7 @@ nice_agent_restart ( { GSList *i; - agent_lock(); + agent_lock (agent); /* step: regenerate tie-breaker value */ priv_generate_tie_breaker (agent); @@ -5040,7 +5019,7 @@ nice_agent_restart_stream ( gboolean res = FALSE; NiceStream *stream; - agent_lock(); + agent_lock (agent); stream = agent_find_stream (agent, stream_id); if (!stream) { @@ -5066,6 +5045,8 @@ nice_agent_dispose (GObject *object) QueuedSignal *sig; NiceAgent *agent = NICE_AGENT (object); + agent_lock (agent); + /* step: free resources for the binding discovery timers */ discovery_free (agent); g_assert (agent->discovery_list == NULL); @@ -5131,6 +5112,10 @@ nice_agent_dispose (GObject *object) g_main_context_unref (agent->main_context); agent->main_context = NULL; + agent_unlock (agent); + + g_mutex_clear (&agent->agent_mutex); + if (G_OBJECT_CLASS (nice_agent_parent_class)->dispose) G_OBJECT_CLASS (nice_agent_parent_class)->dispose (object); @@ -5146,20 +5131,20 @@ component_io_cb (GSocket *gsocket, GIOCondition condition, gpointer user_data) gboolean has_io_callback; gboolean remove_source = FALSE; - agent_lock (); + component = socket_source->component; + agent = component->agent; + stream = component->stream; + + agent_lock (agent); if (g_source_is_destroyed (g_main_current_source ())) { /* Silently return FALSE. */ nice_debug ("%s: source %p destroyed", G_STRFUNC, g_main_current_source ()); - agent_unlock (); + agent_unlock (agent); return G_SOURCE_REMOVE; } - component = socket_source->component; - agent = component->agent; - stream = component->stream; - g_object_ref (agent); /* Remove disconnected sockets when we get a HUP */ @@ -5176,7 +5161,7 @@ component_io_cb (GSocket *gsocket, GIOCondition condition, gpointer user_data) } nice_component_remove_socket (component, socket_source->socket); - agent_unlock (); + agent_unlock (agent); g_object_unref (agent); return G_SOURCE_REMOVE; } @@ -5349,7 +5334,7 @@ done: if (component->n_recv_messages == 0 && component->recv_messages == NULL) { agent_unlock_and_emit (agent); } else { - agent_unlock (); + agent_unlock (agent); } g_object_unref (agent); @@ -5380,7 +5365,7 @@ nice_agent_attach_recv ( g_return_val_if_fail (stream_id >= 1, FALSE); g_return_val_if_fail (component_id >= 1, FALSE); - agent_lock(); + agent_lock (agent); /* attach candidates */ @@ -5435,7 +5420,7 @@ nice_agent_set_selected_pair ( g_return_val_if_fail (lfoundation, FALSE); g_return_val_if_fail (rfoundation, FALSE); - agent_lock(); + agent_lock (agent); /* step: check that params specify an existing pair */ if (!agent_find_component (agent, stream_id, component_id, &stream, &component)) { @@ -5497,7 +5482,7 @@ nice_agent_get_selected_pair (NiceAgent *agent, guint stream_id, g_return_val_if_fail (local != NULL, FALSE); g_return_val_if_fail (remote != NULL, FALSE); - agent_lock(); + agent_lock (agent); /* step: check that params specify an existing pair */ if (!agent_find_component (agent, stream_id, component_id, @@ -5529,7 +5514,7 @@ nice_agent_get_selected_socket (NiceAgent *agent, guint stream_id, g_return_val_if_fail (stream_id >= 1, NULL); g_return_val_if_fail (component_id >= 1, NULL); - agent_lock(); + agent_lock (agent); /* Reliable streams are pseudotcp or MUST use RFC 4571 framing */ if (agent->reliable) @@ -5560,16 +5545,82 @@ nice_agent_get_selected_socket (NiceAgent *agent, guint stream_id, return g_socket; } +typedef struct _TimeoutData +{ + GWeakRef/*<NiceAgent>*/ agent_ref; + NiceTimeoutLockedCallback function; + gpointer user_data; +} TimeoutData; + +static void +timeout_data_destroy (TimeoutData *data) +{ + g_weak_ref_clear (&data->agent_ref); + g_slice_free (TimeoutData, data); +} + +static TimeoutData * +timeout_data_new (NiceAgent *agent, NiceTimeoutLockedCallback function, + gpointer user_data) +{ + TimeoutData *data = g_slice_new0 (TimeoutData); + + g_weak_ref_init (&data->agent_ref, agent); + data->function = function; + data->user_data = user_data; + + return data; +} + +static gboolean +timeout_cb (gpointer user_data) +{ + TimeoutData *data = user_data; + NiceAgent *agent; + gboolean ret = G_SOURCE_REMOVE; + + agent = g_weak_ref_get (&data->agent_ref); + if (agent == NULL) { + return G_SOURCE_REMOVE; + } + + agent_lock (agent); + + /* A race condition might happen where the mutex above waits for the lock + * and in the meantime another thread destroys the source. + * In that case, we don't need to run the function since it should + * have been cancelled */ + if (g_source_is_destroyed (g_main_current_source ())) { + nice_debug ("Source was destroyed. Avoided race condition in timeout_cb"); + + agent_unlock (agent); + goto end; + } + + ret = data->function (agent, data->user_data); + + agent_unlock_and_emit (agent); + + end: + g_object_unref (agent); + + return ret; +} + /* Create a new timer GSource with the given @name, @interval, callback * @function and @data, and assign it to @out, destroying and freeing any * existing #GSource in @out first. * * This guarantees that a timer won’t be overwritten without being destroyed. + * + * @interval is given in milliseconds. */ -void agent_timeout_add_with_context (NiceAgent *agent, GSource **out, - const gchar *name, guint interval, GSourceFunc function, gpointer data) +static void agent_timeout_add_with_context_internal (NiceAgent *agent, + GSource **out, const gchar *name, guint interval, gboolean seconds, + NiceTimeoutLockedCallback function, gpointer user_data) { GSource *source; + TimeoutData *data; g_return_if_fail (function != NULL); g_return_if_fail (out != NULL); @@ -5582,16 +5633,36 @@ void agent_timeout_add_with_context (NiceAgent *agent, GSource **out, } /* Create the new source. */ - source = g_timeout_source_new (interval); + if (seconds) + source = g_timeout_source_new_seconds (interval); + else + source = g_timeout_source_new (interval); g_source_set_name (source, name); - g_source_set_callback (source, function, data, NULL); + data = timeout_data_new (agent, function, user_data); + g_source_set_callback (source, timeout_cb, data, + (GDestroyNotify)timeout_data_destroy); g_source_attach (source, agent->main_context); /* Return it! */ *out = source; } +void agent_timeout_add_with_context (NiceAgent *agent, + GSource **out, const gchar *name, guint interval, + NiceTimeoutLockedCallback function, gpointer user_data) +{ + agent_timeout_add_with_context_internal (agent, out, name, interval, FALSE, + function, user_data); +} + +void agent_timeout_add_seconds_with_context (NiceAgent *agent, + GSource **out, const gchar *name, guint interval, + NiceTimeoutLockedCallback function, gpointer user_data) +{ + agent_timeout_add_with_context_internal (agent, out, name, interval, TRUE, + function, user_data); +} NICEAPI_EXPORT gboolean nice_agent_set_selected_remote_candidate ( @@ -5612,7 +5683,7 @@ nice_agent_set_selected_remote_candidate ( g_return_val_if_fail (component_id != 0, FALSE); g_return_val_if_fail (candidate != NULL, FALSE); - agent_lock(); + agent_lock (agent); /* step: check if the component exists*/ if (!agent_find_component (agent, stream_id, component_id, &stream, &component)) { @@ -5701,7 +5772,7 @@ nice_agent_set_stream_tos (NiceAgent *agent, g_return_if_fail (NICE_IS_AGENT (agent)); g_return_if_fail (stream_id >= 1); - agent_lock(); + agent_lock (agent); stream = agent_find_stream (agent, stream_id); if (stream == NULL) @@ -5727,7 +5798,7 @@ nice_agent_set_software (NiceAgent *agent, const gchar *software) { g_return_if_fail (NICE_IS_AGENT (agent)); - agent_lock(); + agent_lock (agent); g_free (agent->software_attribute); if (software) @@ -5762,7 +5833,7 @@ nice_agent_set_stream_name (NiceAgent *agent, guint stream_id, " are valid", name); } - agent_lock(); + agent_lock (agent); for (i = agent->streams; i; i = i->next) { NiceStream *stream = i->data; @@ -5797,7 +5868,7 @@ nice_agent_get_stream_name (NiceAgent *agent, guint stream_id) g_return_val_if_fail (NICE_IS_AGENT (agent), NULL); g_return_val_if_fail (stream_id >= 1, NULL); - agent_lock(); + agent_lock (agent); stream = agent_find_stream (agent, stream_id); if (stream == NULL) @@ -5871,7 +5942,7 @@ nice_agent_get_default_local_candidate (NiceAgent *agent, g_return_val_if_fail (stream_id >= 1, NULL); g_return_val_if_fail (component_id >= 1, NULL); - agent_lock (); + agent_lock (agent); /* step: check if the component exists*/ if (!agent_find_component (agent, stream_id, component_id, @@ -6030,7 +6101,7 @@ nice_agent_generate_local_sdp (NiceAgent *agent) g_return_val_if_fail (NICE_IS_AGENT (agent), NULL); - agent_lock(); + agent_lock (agent); for (i = agent->streams; i; i = i->next) { NiceStream *stream = i->data; @@ -6054,7 +6125,7 @@ nice_agent_generate_local_stream_sdp (NiceAgent *agent, guint stream_id, g_return_val_if_fail (NICE_IS_AGENT (agent), NULL); g_return_val_if_fail (stream_id >= 1, NULL); - agent_lock(); + agent_lock (agent); stream = agent_find_stream (agent, stream_id); if (stream == NULL) @@ -6079,7 +6150,7 @@ nice_agent_generate_local_candidate_sdp (NiceAgent *agent, g_return_val_if_fail (NICE_IS_AGENT (agent), NULL); g_return_val_if_fail (candidate != NULL, NULL); - agent_lock(); + agent_lock (agent); sdp = g_string_new (NULL); _generate_candidate_sdp (agent, candidate, sdp); @@ -6101,7 +6172,7 @@ nice_agent_parse_remote_sdp (NiceAgent *agent, const gchar *sdp) g_return_val_if_fail (NICE_IS_AGENT (agent), -1); g_return_val_if_fail (sdp != NULL, -1); - agent_lock(); + agent_lock (agent); for (l = agent->streams; l; l = l->next) { NiceStream *stream = l->data; @@ -6193,7 +6264,7 @@ nice_agent_parse_remote_stream_sdp (NiceAgent *agent, guint stream_id, g_return_val_if_fail (stream_id >= 1, NULL); g_return_val_if_fail (sdp != NULL, NULL); - agent_lock(); + agent_lock (agent); stream = agent_find_stream (agent, stream_id); if (stream == NULL) { @@ -6372,7 +6443,7 @@ nice_agent_get_io_stream (NiceAgent *agent, guint stream_id, g_return_val_if_fail (agent->reliable, NULL); - agent_lock (); + agent_lock (agent); if (!agent_find_component (agent, stream_id, component_id, NULL, &component)) goto done; @@ -6398,7 +6469,7 @@ nice_agent_forget_relays (NiceAgent *agent, guint stream_id, guint component_id) g_return_val_if_fail (stream_id >= 1, FALSE); g_return_val_if_fail (component_id >= 1, FALSE); - agent_lock (); + agent_lock (agent); if (!agent_find_component (agent, stream_id, component_id, NULL, &component)) { ret = FALSE; @@ -6455,12 +6526,12 @@ nice_agent_get_component_state (NiceAgent *agent, NiceComponentState state = NICE_COMPONENT_STATE_FAILED; NiceComponent *component; - agent_lock (); + agent_lock (agent); if (agent_find_component (agent, stream_id, component_id, NULL, &component)) state = component->state; - agent_unlock (); + agent_unlock (agent); return state; } diff --git a/agent/component.c b/agent/component.c index 84491e3..dfdba13 100644 --- a/agent/component.c +++ b/agent/component.c @@ -894,7 +894,7 @@ nice_component_emit_io_callback (NiceComponent *component, agent_unlock_and_emit (agent); io_callback (agent, stream_id, component_id, buf_len, (gchar *) buf, io_user_data); - agent_lock (); + agent_lock (agent); } else { IOCallbackData *data; @@ -1210,7 +1210,7 @@ component_source_prepare (GSource *source, gint *timeout_) return FALSE; /* Needed due to accessing the Component. */ - agent_lock (); + agent_lock (agent); if (!agent_find_component (agent, component_source->stream_id, component_source->component_id, NULL, diff --git a/agent/conncheck.c b/agent/conncheck.c index d267429..e2edebd 100644 --- a/agent/conncheck.c +++ b/agent/conncheck.c @@ -72,7 +72,8 @@ static size_t priv_create_username (NiceAgent *agent, NiceStream *stream, uint8_t *dest, guint dest_len, gboolean inbound); static size_t priv_get_password (NiceAgent *agent, NiceStream *stream, NiceCandidate *remote, uint8_t **password); -static void conn_check_free_item (gpointer data); +static void candidate_check_pair_free (NiceAgent *agent, + CandidateCheckPair *pair); static CandidateCheckPair *priv_conn_check_add_for_candidate_pair_matched ( NiceAgent *agent, guint stream_id, NiceComponent *component, NiceCandidate *local, NiceCandidate *remote, NiceCheckState initial_state); @@ -1040,7 +1041,8 @@ conn_check_stop (NiceAgent *agent) * * @return will return FALSE when no more pending timers. */ -static gboolean priv_conn_check_tick_unlocked (NiceAgent *agent) +static gboolean priv_conn_check_tick_agent_locked (NiceAgent *agent, + gpointer user_data) { CandidateCheckPair *pair = NULL; gboolean keep_timer_going = FALSE; @@ -1150,42 +1152,11 @@ static gboolean priv_conn_check_tick_unlocked (NiceAgent *agent) return TRUE; } -static gboolean priv_conn_check_tick (gpointer pointer) -{ - gboolean ret; - NiceAgent *agent = pointer; - - agent_lock(); - if (g_source_is_destroyed (g_main_current_source ())) { - nice_debug ("Source was destroyed. " - "Avoided race condition in priv_conn_check_tick"); - agent_unlock (); - return FALSE; - } - - ret = priv_conn_check_tick_unlocked (agent); - agent_unlock_and_emit (agent); - - return ret; -} - -static gboolean priv_conn_keepalive_retransmissions_tick (gpointer pointer) +static gboolean priv_conn_keepalive_retransmissions_tick_agent_locked ( + NiceAgent *agent, gpointer pointer) { CandidatePair *pair = (CandidatePair *) pointer; - agent_lock(); - - /* A race condition might happen where the mutex above waits for the lock - * and in the meantime another thread destroys the source. - * In that case, we don't need to run our retransmission tick since it should - * have been cancelled */ - if (g_source_is_destroyed (g_main_current_source ())) { - nice_debug ("Source was destroyed. " - "Avoided race condition in priv_conn_keepalive_retransmissions_tick"); - agent_unlock (); - return FALSE; - } - g_source_destroy (pair->keepalive.tick_source); g_source_unref (pair->keepalive.tick_source); pair->keepalive.tick_source = NULL; @@ -1197,12 +1168,11 @@ static gboolean priv_conn_keepalive_retransmissions_tick (gpointer pointer) StunTransactionId id; NiceComponent *component; - if (!agent_find_component (pair->keepalive.agent, + if (!agent_find_component (agent, pair->keepalive.stream_id, pair->keepalive.component_id, NULL, &component)) { nice_debug ("Could not find stream or component in" " priv_conn_keepalive_retransmissions_tick"); - agent_unlock (); return FALSE; } @@ -1213,11 +1183,11 @@ static gboolean priv_conn_keepalive_retransmissions_tick (gpointer pointer) if (pair->keepalive.agent->media_after_tick) { nice_debug ("Agent %p : Keepalive conncheck timed out!! " "but media was received. Suspecting keepalive lost because of " - "network bottleneck", pair->keepalive.agent); + "network bottleneck", agent); } else { nice_debug ("Agent %p : Keepalive conncheck timed out!! " - "peer probably lost connection", pair->keepalive.agent); - agent_signal_component_state_change (pair->keepalive.agent, + "peer probably lost connection", agent); + agent_signal_component_state_change (agent, pair->keepalive.stream_id, pair->keepalive.component_id, NICE_COMPONENT_STATE_FAILED); } @@ -1230,25 +1200,23 @@ static gboolean priv_conn_keepalive_retransmissions_tick (gpointer pointer) (gchar *)pair->keepalive.stun_buffer); nice_debug ("Agent %p : Retransmitting keepalive conncheck", - pair->keepalive.agent); - agent_timeout_add_with_context (pair->keepalive.agent, + agent); + agent_timeout_add_with_context (agent, &pair->keepalive.tick_source, "Pair keepalive", stun_timer_remainder (&pair->keepalive.timer), - priv_conn_keepalive_retransmissions_tick, pair); + priv_conn_keepalive_retransmissions_tick_agent_locked, pair); break; case STUN_USAGE_TIMER_RETURN_SUCCESS: - agent_timeout_add_with_context (pair->keepalive.agent, + agent_timeout_add_with_context (agent, &pair->keepalive.tick_source, "Pair keepalive", stun_timer_remainder (&pair->keepalive.timer), - priv_conn_keepalive_retransmissions_tick, pair); + priv_conn_keepalive_retransmissions_tick_agent_locked, pair); break; default: g_assert_not_reached(); break; } - - agent_unlock_and_emit (pair->keepalive.agent); return FALSE; } @@ -1406,7 +1374,7 @@ static gboolean priv_conn_keepalive_tick_unlocked (NiceAgent *agent) agent_timeout_add_with_context (p->keepalive.agent, &p->keepalive.tick_source, "Pair keepalive", stun_timer_remainder (&p->keepalive.timer), - priv_conn_keepalive_retransmissions_tick, p); + priv_conn_keepalive_retransmissions_tick_agent_locked, p); } else { ++errors; } @@ -1486,19 +1454,11 @@ static gboolean priv_conn_keepalive_tick_unlocked (NiceAgent *agent) return ret; } -static gboolean priv_conn_keepalive_tick (gpointer pointer) +static gboolean priv_conn_keepalive_tick_agent_locked (NiceAgent *agent, + gpointer pointer) { - NiceAgent *agent = pointer; gboolean ret; - agent_lock(); - if (g_source_is_destroyed (g_main_current_source ())) { - nice_debug ("Source was destroyed. " - "Avoided race condition in priv_conn_keepalive_tick"); - agent_unlock (); - return FALSE; - } - ret = priv_conn_keepalive_tick_unlocked (agent); if (ret == FALSE) { if (agent->keepalive_timer_source) { @@ -1507,36 +1467,20 @@ static gboolean priv_conn_keepalive_tick (gpointer pointer) agent->keepalive_timer_source = NULL; } } - agent_unlock_and_emit (agent); + return ret; } -static gboolean priv_turn_allocate_refresh_retransmissions_tick (gpointer pointer) +static gboolean priv_turn_allocate_refresh_retransmissions_tick_agent_locked ( + NiceAgent *agent, gpointer pointer) { CandidateRefresh *cand = (CandidateRefresh *) pointer; - NiceAgent *agent = NULL; - - agent_lock(); - - /* A race condition might happen where the mutex above waits for the lock - * and in the meantime another thread destroys the source. - * In that case, we don't need to run our retransmission tick since it should - * have been cancelled */ - if (g_source_is_destroyed (g_main_current_source ())) { - nice_debug ("Source was destroyed. " - "Avoided race condition in priv_turn_allocate_refresh_retransmissions_tick"); - agent_unlock (); - return FALSE; - } - g_source_destroy (cand->tick_source); g_source_unref (cand->tick_source); cand->tick_source = NULL; - agent = g_object_ref (cand->agent); - switch (stun_timer_refresh (&cand->timer)) { case STUN_USAGE_TIMER_RETURN_TIMEOUT: { @@ -1556,27 +1500,23 @@ static gboolean priv_turn_allocate_refresh_retransmissions_tick (gpointer pointe agent_timeout_add_with_context (agent, &cand->tick_source, "Candidate TURN refresh", stun_timer_remainder (&cand->timer), - priv_turn_allocate_refresh_retransmissions_tick, cand); + priv_turn_allocate_refresh_retransmissions_tick_agent_locked, cand); break; case STUN_USAGE_TIMER_RETURN_SUCCESS: agent_timeout_add_with_context (agent, &cand->tick_source, "Candidate TURN refresh", stun_timer_remainder (&cand->timer), - priv_turn_allocate_refresh_retransmissions_tick, cand); + priv_turn_allocate_refresh_retransmissions_tick_agent_locked, cand); break; default: /* Nothing to do. */ break; } - - agent_unlock_and_emit (agent); - - g_object_unref (agent); - - return FALSE; + return G_SOURCE_REMOVE; } -static void priv_turn_allocate_refresh_tick_unlocked (CandidateRefresh *cand) +static void priv_turn_allocate_refresh_tick_unlocked (NiceAgent *agent, + CandidateRefresh *cand) { uint8_t *username; gsize username_len; @@ -1584,7 +1524,7 @@ static void priv_turn_allocate_refresh_tick_unlocked (CandidateRefresh *cand) gsize password_len; size_t buffer_len = 0; StunUsageTurnCompatibility turn_compat = - agent_to_turn_compatibility (cand->agent); + agent_to_turn_compatibility (agent); username = (uint8_t *)cand->candidate->turn->username; username_len = (size_t) strlen (cand->candidate->turn->username); @@ -1610,7 +1550,7 @@ static void priv_turn_allocate_refresh_tick_unlocked (CandidateRefresh *cand) g_free (password); } - nice_debug ("Agent %p : Sending allocate Refresh %zd", cand->agent, + nice_debug ("Agent %p : Sending allocate Refresh %zd", agent, buffer_len); if (cand->tick_source != NULL) { @@ -1621,16 +1561,16 @@ static void priv_turn_allocate_refresh_tick_unlocked (CandidateRefresh *cand) if (buffer_len > 0) { stun_timer_start (&cand->timer, - cand->agent->stun_initial_timeout, - cand->agent->stun_max_retransmissions); + agent->stun_initial_timeout, + agent->stun_max_retransmissions); /* send the refresh */ agent_socket_send (cand->nicesock, &cand->server, buffer_len, (gchar *)cand->stun_buffer); - agent_timeout_add_with_context (cand->agent, &cand->tick_source, + agent_timeout_add_with_context (agent, &cand->tick_source, "Candidate TURN refresh", stun_timer_remainder (&cand->timer), - priv_turn_allocate_refresh_retransmissions_tick, cand); + priv_turn_allocate_refresh_retransmissions_tick_agent_locked, cand); } } @@ -1643,22 +1583,14 @@ static void priv_turn_allocate_refresh_tick_unlocked (CandidateRefresh *cand) * * @return will return FALSE when no more pending timers. */ -static gboolean priv_turn_allocate_refresh_tick (gpointer pointer) +static gboolean priv_turn_allocate_refresh_tick_agent_locked (NiceAgent *agent, + gpointer pointer) { CandidateRefresh *cand = (CandidateRefresh *) pointer; - agent_lock(); - if (g_source_is_destroyed (g_main_current_source ())) { - nice_debug ("Source was destroyed. " - "Avoided race condition in priv_turn_allocate_refresh_tick"); - agent_unlock (); - return FALSE; - } - - priv_turn_allocate_refresh_tick_unlocked (cand); - agent_unlock_and_emit (cand->agent); + priv_turn_allocate_refresh_tick_unlocked (agent, cand); - return FALSE; + return G_SOURCE_REMOVE; } @@ -1674,14 +1606,14 @@ void conn_check_schedule_next (NiceAgent *agent) if (agent->conncheck_timer_source == NULL) { agent_timeout_add_with_context (agent, &agent->conncheck_timer_source, "Connectivity check schedule", agent->timer_ta, - priv_conn_check_tick, agent); + priv_conn_check_tick_agent_locked, NULL); } /* step: also start the keepalive timer */ if (agent->keepalive_timer_source == NULL) { agent_timeout_add_with_context (agent, &agent->keepalive_timer_source, "Connectivity keepalive timeout", NICE_AGENT_TIMER_TR_DEFAULT, - priv_conn_keepalive_tick, agent); + priv_conn_keepalive_tick_agent_locked, NULL); } } @@ -1753,7 +1685,8 @@ void conn_check_remote_credentials_set(NiceAgent *agent, NiceStream *stream) * in ICE spec section 5.7.3 (ID-19). See also * conn_check_add_for_candidate(). */ -static GSList *priv_limit_conn_check_list_size (GSList *conncheck_list, guint upper_limit) +static GSList *priv_limit_conn_check_list_size (NiceAgent *agent, + GSList *conncheck_list, guint upper_limit) { guint valid = 0; guint cancelled = 0; @@ -1765,7 +1698,7 @@ static GSList *priv_limit_conn_check_list_size (GSList *conncheck_list, guint up valid++; if (valid > upper_limit) { - conn_check_free_item (pair); + candidate_check_pair_free (agent, pair); conncheck_list = g_slist_delete_link (conncheck_list, item); cancelled++; } @@ -1859,7 +1792,6 @@ static void priv_update_check_list_failed_components (NiceAgent *agent, NiceStre for (i = stream->conncheck_list; i; i = i->next) { CandidateCheckPair *p = i->data; - g_assert (p->agent == agent); g_assert (p->stream_id == stream->id); if (p->component_id == (c + 1)) { @@ -2055,7 +1987,6 @@ static CandidateCheckPair *priv_add_new_check_pair (NiceAgent *agent, stream = agent_find_stream (agent, stream_id); pair = g_slice_new0 (CandidateCheckPair); - pair->agent = agent; pair->stream_id = stream_id; pair->component_id = component->id;; pair->local = local; @@ -2095,7 +2026,7 @@ static CandidateCheckPair *priv_add_new_check_pair (NiceAgent *agent, /* implement the hard upper limit for number of checks (see sect 5.7.3 ICE ID-19): */ if (agent->compatibility == NICE_COMPATIBILITY_RFC5245) { - stream->conncheck_list = priv_limit_conn_check_list_size ( + stream->conncheck_list = priv_limit_conn_check_list_size (agent, stream->conncheck_list, agent->max_conn_checks); } @@ -2271,12 +2202,10 @@ int conn_check_add_for_local_candidate (NiceAgent *agent, guint stream_id, NiceC * Frees the CandidateCheckPair structure pointer to * by 'user data'. Compatible with GDestroyNotify. */ -static void conn_check_free_item (gpointer data) +static void candidate_check_pair_free (NiceAgent *agent, + CandidateCheckPair *pair) { - CandidateCheckPair *pair = data; - - if (pair->agent) - priv_remove_pair_from_triggered_check_queue (pair->agent, pair); + priv_remove_pair_from_triggered_check_queue (agent, pair); priv_free_all_stun_transactions (pair, NULL); g_slice_free (CandidateCheckPair, pair); } @@ -2291,9 +2220,13 @@ void conn_check_free (NiceAgent *agent) NiceStream *stream = i->data; if (stream->conncheck_list) { + GSList *item; + nice_debug ("Agent %p, freeing conncheck_list of stream %p", agent, stream); - g_slist_free_full (stream->conncheck_list, conn_check_free_item); + for (item = stream->conncheck_list; item; item = item->next) + candidate_check_pair_free (agent, item->data); + g_slist_free (stream->conncheck_list); stream->conncheck_list = NULL; } } @@ -2313,9 +2246,13 @@ void conn_check_prune_stream (NiceAgent *agent, NiceStream *stream) gboolean keep_going = FALSE; if (stream->conncheck_list) { + GSList *item; + nice_debug ("Agent %p, freeing conncheck_list of stream %p", agent, stream); - g_slist_free_full (stream->conncheck_list, conn_check_free_item); + for (item = stream->conncheck_list; item; item = item->next) + candidate_check_pair_free (agent, item->data); + g_slist_free (stream->conncheck_list); stream->conncheck_list = NULL; } @@ -2725,7 +2662,7 @@ static guint priv_prune_pending_checks (NiceAgent *agent, NiceStream *stream, gu if (p->component_id == component_id) { if (p->state == NICE_CHECK_FROZEN || p->state == NICE_CHECK_WAITING) { nice_debug ("Agent %p : pair %p removed.", agent, p); - conn_check_free_item (p); + candidate_check_pair_free (agent, p); stream->conncheck_list = g_slist_delete_link(stream->conncheck_list, i); } @@ -2972,7 +2909,6 @@ static CandidateCheckPair *priv_add_peer_reflexive_pair (NiceAgent *agent, guint CandidateCheckPair *pair = g_slice_new0 (CandidateCheckPair); NiceStream *stream = agent_find_stream (agent, stream_id); - pair->agent = agent; pair->stream_id = stream_id; pair->component_id = component->id;; pair->local = local_cand; @@ -3480,9 +3416,10 @@ priv_add_new_turn_refresh (CandidateDiscovery *cdisco, NiceCandidate *relay_cand /* step: also start the refresh timer */ /* refresh should be sent 1 minute before it expires */ - agent_timeout_add_with_context (agent, &cand->timer_source, + agent_timeout_add_seconds_with_context (agent, &cand->timer_source, "Candidate TURN refresh", - (lifetime - 60) * 1000, priv_turn_allocate_refresh_tick, cand); + lifetime - 60, priv_turn_allocate_refresh_tick_agent_locked, + cand); nice_debug ("timer source is : %p", cand->timer_source); @@ -3791,9 +3728,10 @@ static gboolean priv_map_reply_to_relay_refresh (NiceAgent *agent, StunMessage * agent, cand, (int)res); if (res == STUN_USAGE_TURN_RETURN_RELAY_SUCCESS) { /* refresh should be sent 1 minute before it expires */ - agent_timeout_add_with_context (cand->agent, &cand->timer_source, - "Candidate TURN refresh", (lifetime - 60) * 1000, - priv_turn_allocate_refresh_tick, cand); + agent_timeout_add_seconds_with_context (cand->agent, + &cand->timer_source, + "Candidate TURN refresh", lifetime - 60, + priv_turn_allocate_refresh_tick_agent_locked, cand); g_source_destroy (cand->tick_source); g_source_unref (cand->tick_source); @@ -3827,7 +3765,7 @@ static gboolean priv_map_reply_to_relay_refresh (NiceAgent *agent, StunMessage * stun_message_length (resp)); cand->stun_resp_msg.buffer = cand->stun_resp_buffer; cand->stun_resp_msg.buffer_len = sizeof(cand->stun_resp_buffer); - priv_turn_allocate_refresh_tick_unlocked (cand); + priv_turn_allocate_refresh_tick_unlocked (agent, cand); } else { /* case: a real unauthorized error */ refresh_cancel (cand); @@ -4352,7 +4290,7 @@ conn_check_prune_socket (NiceAgent *agent, NiceStream *stream, NiceComponent *co nice_debug ("Agent %p : Retransmissions failed, giving up on pair %p", agent, p); candidate_check_pair_fail (stream, agent, p); - conn_check_free_item (p); + candidate_check_pair_free (agent, p); stream->conncheck_list = g_slist_delete_link (stream->conncheck_list, l); } diff --git a/agent/conncheck.h b/agent/conncheck.h index 8cfe2d6..2e4bc45 100644 --- a/agent/conncheck.h +++ b/agent/conncheck.h @@ -83,7 +83,6 @@ struct _StunTransaction struct _CandidateCheckPair { - NiceAgent *agent; /* back pointer to owner */ guint stream_id; guint component_id; NiceCandidate *local; diff --git a/agent/discovery.c b/agent/discovery.c index 30a9539..ef27346 100644 --- a/agent/discovery.c +++ b/agent/discovery.c @@ -998,10 +998,9 @@ NiceCandidate *discovery_learn_remote_peer_reflexive_candidate ( * * @return will return FALSE when no more pending timers. */ -static gboolean priv_discovery_tick_unlocked (gpointer pointer) +static gboolean priv_discovery_tick_unlocked (NiceAgent *agent) { CandidateDiscovery *cand; - NiceAgent *agent = pointer; GSList *i; int not_done = 0; /* note: track whether to continue timer */ size_t buffer_len = 0; @@ -1183,20 +1182,12 @@ static gboolean priv_discovery_tick_unlocked (gpointer pointer) return TRUE; } -static gboolean priv_discovery_tick (gpointer pointer) +static gboolean priv_discovery_tick_agent_locked (NiceAgent *agent, + gpointer pointer) { - NiceAgent *agent = pointer; gboolean ret; - agent_lock(); - if (g_source_is_destroyed (g_main_current_source ())) { - nice_debug ("Source was destroyed. " - "Avoided race condition in priv_discovery_tick"); - agent_unlock (); - return FALSE; - } - - ret = priv_discovery_tick_unlocked (pointer); + ret = priv_discovery_tick_unlocked (agent); if (ret == FALSE) { if (agent->discovery_timer_source != NULL) { g_source_destroy (agent->discovery_timer_source); @@ -1204,7 +1195,6 @@ static gboolean priv_discovery_tick (gpointer pointer) agent->discovery_timer_source = NULL; } } - agent_unlock_and_emit (agent); return ret; } @@ -1227,7 +1217,7 @@ void discovery_schedule (NiceAgent *agent) if (res == TRUE) { agent_timeout_add_with_context (agent, &agent->discovery_timer_source, "Candidate discovery tick", agent->timer_ta, - priv_discovery_tick, agent); + priv_discovery_tick_agent_locked, NULL); } } } diff --git a/agent/inputstream.c b/agent/inputstream.c index 58a4a0d..eafac1b 100644 --- a/agent/inputstream.c +++ b/agent/inputstream.c @@ -341,7 +341,7 @@ nice_input_stream_close (GInputStream *stream, GCancellable *cancellable, if (agent == NULL) return TRUE; - agent_lock (); + agent_lock (agent); /* Shut down the read side of the pseudo-TCP stream, if it still exists. */ if (agent_find_component (agent, priv->stream_id, priv->component_id, @@ -350,7 +350,7 @@ nice_input_stream_close (GInputStream *stream, GCancellable *cancellable, pseudo_tcp_socket_shutdown (component->tcp, PSEUDO_TCP_SHUTDOWN_RD); } - agent_unlock (); + agent_unlock (agent); g_object_unref (agent); @@ -376,7 +376,7 @@ nice_input_stream_is_readable (GPollableInputStream *stream) if (agent == NULL) return FALSE; - agent_lock (); + agent_lock (agent); if (!agent_find_component (agent, priv->stream_id, priv->component_id, &_stream, &component)) { @@ -405,7 +405,7 @@ nice_input_stream_is_readable (GPollableInputStream *stream) } done: - agent_unlock (); + agent_unlock (agent); g_object_unref (agent); diff --git a/agent/outputstream.c b/agent/outputstream.c index 391ab33..7fae413 100644 --- a/agent/outputstream.c +++ b/agent/outputstream.c @@ -485,7 +485,7 @@ nice_output_stream_close (GOutputStream *stream, GCancellable *cancellable, if (agent == NULL) return TRUE; - agent_lock (); + agent_lock (agent); /* Shut down the write side of the pseudo-TCP stream. */ if (agent_find_component (agent, priv->stream_id, priv->component_id, @@ -494,7 +494,7 @@ nice_output_stream_close (GOutputStream *stream, GCancellable *cancellable, pseudo_tcp_socket_shutdown (component->tcp, PSEUDO_TCP_SHUTDOWN_WR); } - agent_unlock (); + agent_unlock (agent); g_object_unref (agent); @@ -519,7 +519,7 @@ nice_output_stream_is_writable (GPollableOutputStream *stream) if (agent == NULL) return FALSE; - agent_lock (); + agent_lock (agent); if (!agent_find_component (agent, priv->stream_id, priv->component_id, &_stream, &component)) { @@ -540,7 +540,7 @@ nice_output_stream_is_writable (GPollableOutputStream *stream) } done: - agent_unlock (); + agent_unlock (agent); g_object_unref (agent); @@ -618,7 +618,7 @@ nice_output_stream_create_source (GPollableOutputStream *stream, if (agent == NULL) return component_source; - agent_lock (); + agent_lock (agent); /* Grab the socket for this component. */ if (!agent_find_component (agent, priv->stream_id, priv->component_id, @@ -638,7 +638,7 @@ nice_output_stream_create_source (GPollableOutputStream *stream, } done: - agent_unlock (); + agent_unlock (agent); g_object_unref (agent); diff --git a/socket/tcp-bsd.c b/socket/tcp-bsd.c index 285d323..d5dd633 100644 --- a/socket/tcp-bsd.c +++ b/socket/tcp-bsd.c @@ -60,6 +60,7 @@ #define TCP_NODELAY 1 typedef struct { + GMutex mutex; NiceAddress remote_addr; GQueue send_queue; GMainContext *context; @@ -101,6 +102,7 @@ nice_tcp_bsd_socket_new_from_gsock (GMainContext *ctx, GSocket *gsock, if (ctx == NULL) ctx = g_main_context_default (); + g_mutex_init (&priv->mutex); priv->context = g_main_context_ref (ctx); priv->remote_addr = *remote_addr; priv->error = FALSE; @@ -227,6 +229,8 @@ socket_close (NiceSocket *sock) if (priv->context) g_main_context_unref (priv->context); + g_mutex_clear (&priv->mutex); + g_slice_free(TcpPriv, sock->priv); } @@ -424,12 +428,12 @@ socket_send_more ( NiceSocket *sock = (NiceSocket *) data; TcpPriv *priv = sock->priv; - agent_lock (); + g_mutex_lock (&priv->mutex); if (g_source_is_destroyed (g_main_current_source ())) { nice_debug ("Source was destroyed. " "Avoided race condition in tcp-bsd.c:socket_send_more"); - agent_unlock (); + g_mutex_unlock (&priv->mutex); return FALSE; } @@ -441,7 +445,7 @@ socket_send_more ( g_source_unref (priv->io_source); priv->io_source = NULL; - agent_unlock (); + g_mutex_unlock (&priv->mutex); if (priv->writable_cb) priv->writable_cb (sock, priv->writable_data); @@ -449,6 +453,6 @@ socket_send_more ( return FALSE; } - agent_unlock (); + g_mutex_unlock (&priv->mutex); return TRUE; } diff --git a/socket/udp-turn.c b/socket/udp-turn.c index c6bd803..466ec32 100644 --- a/socket/udp-turn.c +++ b/socket/udp-turn.c @@ -71,6 +71,7 @@ typedef struct { } ChannelBinding; typedef struct { + GMutex mutex; GMainContext *ctx; StunAgent agent; GList *channels; @@ -144,7 +145,7 @@ static gboolean priv_send_channel_bind (UdpTurnPriv *priv, const NiceAddress *peer); static gboolean priv_add_channel_binding (UdpTurnPriv *priv, const NiceAddress *peer); -static gboolean priv_forget_send_request (gpointer pointer); +static gboolean priv_forget_send_request_agent_locked (gpointer pointer); static void priv_clear_permissions (UdpTurnPriv *priv); static guint @@ -209,6 +210,7 @@ nice_udp_turn_socket_new (GMainContext *ctx, NiceAddress *addr, STUN_AGENT_USAGE_NO_ALIGNED_ATTRIBUTES); } + g_mutex_init (&priv->mutex); priv->channels = NULL; priv->current_binding = NULL; priv->base_socket = base_socket; @@ -420,18 +422,16 @@ socket_recv_messages (NiceSocket *sock, return i; } +/* interval is given in milliseconds */ static GSource * priv_timeout_add_with_context (UdpTurnPriv *priv, guint interval, - gboolean seconds, GSourceFunc function, gpointer data) + GSourceFunc function, gpointer data) { - GSource *source; + GSource *source = NULL; g_return_val_if_fail (function != NULL, NULL); - if (seconds) - source = g_timeout_source_new_seconds (interval); - else - source = g_timeout_source_new (interval); + source = g_timeout_source_new (interval); g_source_set_callback (source, function, data, NULL); g_source_attach (source, priv->ctx); @@ -825,7 +825,7 @@ socket_send_message (NiceSocket *sock, const NiceAddress *to, req->priv = priv; stun_message_id (&msg, req->id); req->source = priv_timeout_add_with_context (priv, - STUN_END_TIMEOUT, FALSE, priv_forget_send_request, req); + STUN_END_TIMEOUT, priv_forget_send_request_agent_locked, req); g_queue_push_tail (priv->send_requests, req); } } @@ -962,19 +962,10 @@ socket_is_based_on (NiceSocket *sock, NiceSocket *other) } static gboolean -priv_forget_send_request (gpointer pointer) +priv_forget_send_request_agent_locked (gpointer pointer) { SendRequest *req = pointer; - agent_lock (); - - if (g_source_is_destroyed (g_main_current_source ())) { - nice_debug ("Source was destroyed. " - "Avoided race condition in turn.c:priv_forget_send_request"); - agent_unlock (); - return FALSE; - } - stun_agent_forget_transaction (&req->priv->agent, req->id); g_queue_remove (req->priv->send_requests, req); @@ -983,8 +974,6 @@ priv_forget_send_request (gpointer pointer) g_source_unref (req->source); req->source = NULL; - agent_unlock (); - g_slice_free (SendRequest, req); return FALSE; @@ -997,11 +986,11 @@ priv_permission_timeout (gpointer data) nice_debug ("Permission is about to timeout, schedule renewal"); - agent_lock (); + g_mutex_lock (&priv->mutex); /* remove all permissions for this agent (the permission for the peer we are sending to will be renewed) */ priv_clear_permissions (priv); - agent_unlock (); + g_mutex_unlock (&priv->mutex); return TRUE; } @@ -1015,16 +1004,6 @@ priv_binding_expired_timeout (gpointer data) nice_debug ("Permission expired, refresh failed"); - agent_lock (); - - source = g_main_current_source (); - if (g_source_is_destroyed (source)) { - nice_debug ("Source was destroyed. " - "Avoided race condition in turn.c:priv_binding_expired_timeout"); - agent_unlock (); - return FALSE; - } - /* find current binding and destroy it */ for (i = priv->channels ; i; i = i->next) { ChannelBinding *b = i->data; @@ -1061,8 +1040,6 @@ priv_binding_expired_timeout (gpointer data) } } - agent_unlock (); - return FALSE; } @@ -1075,16 +1052,6 @@ priv_binding_timeout (gpointer data) nice_debug ("Permission is about to timeout, sending binding renewal"); - agent_lock (); - - source = g_main_current_source (); - if (g_source_is_destroyed (source)) { - nice_debug ("Source was destroyed. " - "Avoided race condition in turn.c:priv_binding_timeout"); - agent_unlock (); - return FALSE; - } - /* find current binding and mark it for renewal */ for (i = priv->channels ; i; i = i->next) { ChannelBinding *b = i->data; @@ -1099,7 +1066,7 @@ priv_binding_timeout (gpointer data) /* Install timer to expire the permission */ b->timeout_source = priv_timeout_add_with_context (priv, - STUN_EXPIRE_TIMEOUT, TRUE, priv_binding_expired_timeout, priv); + STUN_EXPIRE_TIMEOUT * 1000, priv_binding_expired_timeout, priv); /* Send renewal */ if (!priv->current_binding_msg) @@ -1108,8 +1075,6 @@ priv_binding_timeout (gpointer data) } } - agent_unlock (); - return FALSE; } @@ -1372,8 +1337,8 @@ nice_udp_turn_socket_parse_recv (NiceSocket *sock, NiceSocket **from_sock, } /* Install timer to schedule refresh of the permission */ binding->timeout_source = - priv_timeout_add_with_context (priv, STUN_BINDING_TIMEOUT, - TRUE, priv_binding_timeout, priv); + priv_timeout_add_with_context (priv, + STUN_BINDING_TIMEOUT * 1000, priv_binding_timeout, priv); } priv_process_pending_bindings (priv); } @@ -1463,8 +1428,9 @@ nice_udp_turn_socket_parse_recv (NiceSocket *sock, NiceSocket **from_sock, if (stun_message_get_class (&msg) == STUN_RESPONSE && !priv->permission_timeout_source) { priv->permission_timeout_source = - priv_timeout_add_with_context (priv, STUN_PERMISSION_TIMEOUT, - TRUE, priv_permission_timeout, priv); + priv_timeout_add_with_context (priv, + STUN_PERMISSION_TIMEOUT * 1000, priv_permission_timeout, + priv); } /* send enqued data */ @@ -1721,14 +1687,6 @@ priv_retransmissions_tick (gpointer pointer) { UdpTurnPriv *priv = pointer; - agent_lock (); - if (g_source_is_destroyed (g_main_current_source ())) { - nice_debug ("Source was destroyed. " - "Avoided race condition in turn.c:priv_retransmissions_tick"); - agent_unlock (); - return FALSE; - } - if (priv_retransmissions_tick_unlocked (priv) == FALSE) { if (priv->tick_source_channel_bind != NULL) { g_source_destroy (priv->tick_source_channel_bind); @@ -1736,7 +1694,6 @@ priv_retransmissions_tick (gpointer pointer) priv->tick_source_channel_bind = NULL; } } - agent_unlock (); return FALSE; } @@ -1746,21 +1703,11 @@ priv_retransmissions_create_permission_tick (gpointer pointer) { UdpTurnPriv *priv = pointer; - agent_lock (); - if (g_source_is_destroyed (g_main_current_source ())) { - nice_debug ("Source was destroyed. Avoided race condition in " - "turn.c:priv_retransmissions_create_permission_tick"); - agent_unlock (); - return FALSE; - } - /* This will call priv_retransmissions_create_permission_tick_unlocked() for * every pending permission with an expired timer and will create a new timer * if there are pending permissions that require it */ priv_schedule_tick (priv); - agent_unlock (); - return FALSE; } @@ -1781,7 +1728,7 @@ priv_schedule_tick (UdpTurnPriv *priv) guint timeout = stun_timer_remainder (&priv->current_binding_msg->timer); if (timeout > 0) { priv->tick_source_channel_bind = - priv_timeout_add_with_context (priv, timeout, FALSE, + priv_timeout_add_with_context (priv, timeout, priv_retransmissions_tick, priv); } else { priv_retransmissions_tick_unlocked (priv); @@ -1819,8 +1766,7 @@ priv_schedule_tick (UdpTurnPriv *priv) /* We create one timer for the minimal timeout we need */ if (min_timeout != G_MAXUINT) { priv->tick_source_create_permission = - priv_timeout_add_with_context (priv, FALSE, - min_timeout, + priv_timeout_add_with_context (priv, min_timeout, priv_retransmissions_create_permission_tick, priv); } |