summaryrefslogtreecommitdiff
path: root/agent/agent.c
diff options
context:
space:
mode:
Diffstat (limited to 'agent/agent.c')
-rw-r--r--agent/agent.c251
1 files changed, 161 insertions, 90 deletions
diff --git a/agent/agent.c b/agent/agent.c
index db3067e..7f444b9 100644
--- a/agent/agent.c
+++ b/agent/agent.c
@@ -140,8 +140,6 @@ enum
static guint signals[N_SIGNALS];
-static GMutex agent_mutex; /* Mutex used for thread-safe lib */
-
static void priv_stop_upnp (NiceAgent *agent);
static void pseudo_tcp_socket_opened (PseudoTcpSocket *sock, gpointer user_data);
@@ -159,14 +157,14 @@ static void nice_agent_get_property (GObject *object,
static void nice_agent_set_property (GObject *object,
guint property_id, const GValue *value, GParamSpec *pspec);
-void agent_lock (void)
+void agent_lock (NiceAgent *agent)
{
- g_mutex_lock (&agent_mutex);
+ g_mutex_lock (&agent->agent_mutex);
}
-void agent_unlock (void)
+void agent_unlock (NiceAgent *agent)
{
- g_mutex_unlock (&agent_mutex);
+ g_mutex_unlock (&agent->agent_mutex);
}
static GType _nice_agent_stream_ids_get_type (void);
@@ -208,7 +206,7 @@ agent_unlock_and_emit (NiceAgent *agent)
queue = agent->pending_signals;
g_queue_init (&agent->pending_signals);
- agent_unlock ();
+ agent_unlock (agent);
while ((sig = g_queue_pop_head (&queue))) {
g_signal_emitv (sig->params, sig->signal_id, 0, NULL);
@@ -1185,6 +1183,8 @@ nice_agent_init (NiceAgent *agent)
priv_generate_tie_breaker (agent);
g_queue_init (&agent->pending_signals);
+
+ g_mutex_init (&agent->agent_mutex);
}
@@ -1241,7 +1241,7 @@ nice_agent_get_property (
{
NiceAgent *agent = NICE_AGENT (object);
- agent_lock();
+ agent_lock (agent);
switch (property_id)
{
@@ -1441,7 +1441,7 @@ nice_agent_set_property (
{
NiceAgent *agent = NICE_AGENT (object);
- agent_lock();
+ agent_lock (agent);
switch (property_id)
{
@@ -1977,29 +1977,17 @@ pseudo_tcp_socket_write_packet (PseudoTcpSocket *psocket,
static gboolean
-notify_pseudo_tcp_socket_clock (gpointer user_data)
+notify_pseudo_tcp_socket_clock_agent_locked (NiceAgent *agent,
+ gpointer user_data)
{
NiceComponent *component = user_data;
NiceStream *stream;
- NiceAgent *agent;
-
- agent_lock();
stream = component->stream;
- agent = component->agent;
-
- if (g_source_is_destroyed (g_main_current_source ())) {
- nice_debug ("Source was destroyed. "
- "Avoided race condition in notify_pseudo_tcp_socket_clock");
- agent_unlock ();
- return FALSE;
- }
pseudo_tcp_socket_notify_clock (component->tcp);
adjust_tcp_clock (agent, stream, component);
- agent_unlock_and_emit (agent);
-
return G_SOURCE_CONTINUE;
}
@@ -2023,7 +2011,7 @@ adjust_tcp_clock (NiceAgent *agent, NiceStream *stream, NiceComponent *component
interval = G_MAXINT;
agent_timeout_add_with_context (agent, &component->tcp_clock,
"Pseudo-TCP clock", interval,
- notify_pseudo_tcp_socket_clock, component);
+ notify_pseudo_tcp_socket_clock_agent_locked, component);
}
}
} else {
@@ -2042,13 +2030,13 @@ _tcp_sock_is_writable (NiceSocket *sock, gpointer user_data)
NiceAgent *agent = component->agent;
NiceStream *stream = component->stream;
- agent_lock ();
+ agent_lock (agent);
/* Don't signal writable if the socket that has become writable is not
* the selected pair */
if (component->selected_pair.local == NULL ||
!nice_socket_is_based_on (component->selected_pair.local->sockptr, sock)) {
- agent_unlock ();
+ agent_unlock (agent);
return;
}
@@ -2614,7 +2602,7 @@ nice_agent_add_stream (
g_return_val_if_fail (NICE_IS_AGENT (agent), 0);
g_return_val_if_fail (n_components >= 1, 0);
- agent_lock();
+ agent_lock (agent);
stream = nice_stream_new (n_components, agent);
agent->streams = g_slist_append (agent->streams, stream);
@@ -2663,7 +2651,7 @@ nice_agent_set_relay_info(NiceAgent *agent,
g_return_val_if_fail (password, FALSE);
g_return_val_if_fail (type <= NICE_RELAY_TYPE_TURN_TLS, FALSE);
- agent_lock();
+ agent_lock (agent);
if (!agent_find_component (agent, stream_id, component_id, &stream,
&component)) {
@@ -2716,18 +2704,9 @@ nice_agent_set_relay_info(NiceAgent *agent,
static void agent_check_upnp_gathering_done (NiceAgent *agent);
-static gboolean priv_upnp_timeout_cb (gpointer user_data)
+static gboolean priv_upnp_timeout_cb_agent_locked (NiceAgent *agent,
+ gpointer user_data)
{
- NiceAgent *agent = (NiceAgent*)user_data;
-
- agent_lock();
-
- /* If the source has been destroyed, we have already freed all mappings. */
- if (g_source_is_destroyed (g_main_current_source ())) {
- agent_unlock ();
- return FALSE;
- }
-
nice_debug ("Agent %p : UPnP port mapping timed out", agent);
/* We cannot free priv->upnp here as it may be holding mappings open which
@@ -2737,7 +2716,6 @@ static gboolean priv_upnp_timeout_cb (gpointer user_data)
agent_check_upnp_gathering_done (agent);
- agent_unlock_and_emit (agent);
return FALSE;
}
@@ -2772,7 +2750,7 @@ static void _upnp_mapped_external_port (GUPnPSimpleIgd *self, gchar *proto,
NiceCandidateTransport transport;
GSList *i, *j, *k;
- agent_lock();
+ agent_lock (agent);
if (agent->upnp_timer_source == NULL)
goto end;
@@ -2842,7 +2820,7 @@ static void _upnp_error_mapping_port (GUPnPSimpleIgd *self, GError *error,
NiceAddress localaddr;
GSList *i;
- agent_lock();
+ agent_lock (agent);
nice_debug ("Agent %p : Error mapping %s:%d to %d (%d) : %s", agent, local_ip,
local_port, external_port, error->domain, error->message);
@@ -2880,7 +2858,7 @@ nice_agent_gather_candidates (
g_return_val_if_fail (NICE_IS_AGENT (agent), FALSE);
g_return_val_if_fail (stream_id >= 1, FALSE);
- agent_lock();
+ agent_lock (agent);
stream = agent_find_stream (agent, stream_id);
if (stream == NULL) {
@@ -3047,7 +3025,8 @@ nice_agent_gather_candidates (
agent->upnp_mapping = g_slist_prepend (agent->upnp_mapping, base_addr);
agent_timeout_add_with_context (agent, &agent->upnp_timer_source,
- "UPnP timeout", agent->upnp_timeout, priv_upnp_timeout_cb, agent);
+ "UPnP timeout", agent->upnp_timeout,
+ priv_upnp_timeout_cb_agent_locked, agent);
}
#endif
@@ -3221,7 +3200,7 @@ nice_agent_remove_stream (
g_return_if_fail (NICE_IS_AGENT (agent));
g_return_if_fail (stream_id >= 1);
- agent_lock();
+ agent_lock (agent);
stream = agent_find_stream (agent, stream_id);
if (!stream) {
@@ -3265,7 +3244,7 @@ nice_agent_set_port_range (NiceAgent *agent, guint stream_id, guint component_id
g_return_if_fail (stream_id >= 1);
g_return_if_fail (component_id >= 1);
- agent_lock();
+ agent_lock (agent);
if (agent_find_component (agent, stream_id, component_id, &stream,
&component)) {
@@ -3288,7 +3267,7 @@ nice_agent_add_local_address (NiceAgent *agent, NiceAddress *addr)
g_return_val_if_fail (NICE_IS_AGENT (agent), FALSE);
g_return_val_if_fail (addr != NULL, FALSE);
- agent_lock();
+ agent_lock (agent);
dupaddr = nice_address_dup (addr);
nice_address_set_port (dupaddr, 0);
@@ -3489,7 +3468,7 @@ nice_agent_set_remote_credentials (
g_return_val_if_fail (NICE_IS_AGENT (agent), FALSE);
g_return_val_if_fail (stream_id >= 1, FALSE);
- agent_lock();
+ agent_lock (agent);
stream = agent_find_stream (agent, stream_id);
/* note: oddly enough, ufrag and pwd can be empty strings */
@@ -3522,7 +3501,7 @@ nice_agent_set_local_credentials (
g_return_val_if_fail (NICE_IS_AGENT (agent), FALSE);
g_return_val_if_fail (stream_id >= 1, FALSE);
- agent_lock ();
+ agent_lock (agent);
stream = agent_find_stream (agent, stream_id);
@@ -3553,7 +3532,7 @@ nice_agent_get_local_credentials (
g_return_val_if_fail (NICE_IS_AGENT (agent), FALSE);
g_return_val_if_fail (stream_id >= 1, FALSE);
- agent_lock();
+ agent_lock (agent);
stream = agent_find_stream (agent, stream_id);
if (stream == NULL) {
@@ -3623,7 +3602,7 @@ nice_agent_set_remote_candidates (NiceAgent *agent, guint stream_id, guint compo
nice_debug ("Agent %p: set_remote_candidates %d %d", agent, stream_id, component_id);
- agent_lock();
+ agent_lock (agent);
if (!agent_find_component (agent, stream_id, component_id,
&stream, &component)) {
@@ -4422,7 +4401,7 @@ nice_agent_recv_messages_blocking_or_nonblocking (NiceAgent *agent,
}
}
- agent_lock ();
+ agent_lock (agent);
if (!agent_find_component (agent, stream_id, component_id,
&stream, &component)) {
@@ -4516,9 +4495,9 @@ nice_agent_recv_messages_blocking_or_nonblocking (NiceAgent *agent,
memcpy (&prev_recv_messages_iter, &component->recv_messages_iter,
sizeof (NiceInputMessageIter));
- agent_unlock ();
+ agent_unlock (agent);
g_main_context_iteration (context, blocking);
- agent_lock ();
+ agent_lock (agent);
if (!agent_find_component (agent, stream_id, component_id,
&stream, &component)) {
@@ -4709,7 +4688,7 @@ nice_agent_send_messages_nonblocking_internal (
g_assert (n_messages == 1 || !allow_partial);
- agent_lock ();
+ agent_lock (agent);
if (!agent_find_component (agent, stream_id, component_id,
&stream, &component)) {
@@ -4956,7 +4935,7 @@ nice_agent_get_local_candidates (
g_return_val_if_fail (stream_id >= 1, NULL);
g_return_val_if_fail (component_id >= 1, NULL);
- agent_lock();
+ agent_lock (agent);
if (!agent_find_component (agent, stream_id, component_id, NULL, &component)) {
goto done;
@@ -4990,7 +4969,7 @@ nice_agent_get_remote_candidates (
g_return_val_if_fail (stream_id >= 1, NULL);
g_return_val_if_fail (component_id >= 1, NULL);
- agent_lock();
+ agent_lock (agent);
if (!agent_find_component (agent, stream_id, component_id, NULL, &component))
{
goto done;
@@ -5010,7 +4989,7 @@ nice_agent_restart (
{
GSList *i;
- agent_lock();
+ agent_lock (agent);
/* step: regenerate tie-breaker value */
priv_generate_tie_breaker (agent);
@@ -5040,7 +5019,7 @@ nice_agent_restart_stream (
gboolean res = FALSE;
NiceStream *stream;
- agent_lock();
+ agent_lock (agent);
stream = agent_find_stream (agent, stream_id);
if (!stream) {
@@ -5066,6 +5045,8 @@ nice_agent_dispose (GObject *object)
QueuedSignal *sig;
NiceAgent *agent = NICE_AGENT (object);
+ agent_lock (agent);
+
/* step: free resources for the binding discovery timers */
discovery_free (agent);
g_assert (agent->discovery_list == NULL);
@@ -5131,6 +5112,10 @@ nice_agent_dispose (GObject *object)
g_main_context_unref (agent->main_context);
agent->main_context = NULL;
+ agent_unlock (agent);
+
+ g_mutex_clear (&agent->agent_mutex);
+
if (G_OBJECT_CLASS (nice_agent_parent_class)->dispose)
G_OBJECT_CLASS (nice_agent_parent_class)->dispose (object);
@@ -5146,20 +5131,20 @@ component_io_cb (GSocket *gsocket, GIOCondition condition, gpointer user_data)
gboolean has_io_callback;
gboolean remove_source = FALSE;
- agent_lock ();
+ component = socket_source->component;
+ agent = component->agent;
+ stream = component->stream;
+
+ agent_lock (agent);
if (g_source_is_destroyed (g_main_current_source ())) {
/* Silently return FALSE. */
nice_debug ("%s: source %p destroyed", G_STRFUNC, g_main_current_source ());
- agent_unlock ();
+ agent_unlock (agent);
return G_SOURCE_REMOVE;
}
- component = socket_source->component;
- agent = component->agent;
- stream = component->stream;
-
g_object_ref (agent);
/* Remove disconnected sockets when we get a HUP */
@@ -5176,7 +5161,7 @@ component_io_cb (GSocket *gsocket, GIOCondition condition, gpointer user_data)
}
nice_component_remove_socket (component, socket_source->socket);
- agent_unlock ();
+ agent_unlock (agent);
g_object_unref (agent);
return G_SOURCE_REMOVE;
}
@@ -5349,7 +5334,7 @@ done:
if (component->n_recv_messages == 0 && component->recv_messages == NULL) {
agent_unlock_and_emit (agent);
} else {
- agent_unlock ();
+ agent_unlock (agent);
}
g_object_unref (agent);
@@ -5380,7 +5365,7 @@ nice_agent_attach_recv (
g_return_val_if_fail (stream_id >= 1, FALSE);
g_return_val_if_fail (component_id >= 1, FALSE);
- agent_lock();
+ agent_lock (agent);
/* attach candidates */
@@ -5435,7 +5420,7 @@ nice_agent_set_selected_pair (
g_return_val_if_fail (lfoundation, FALSE);
g_return_val_if_fail (rfoundation, FALSE);
- agent_lock();
+ agent_lock (agent);
/* step: check that params specify an existing pair */
if (!agent_find_component (agent, stream_id, component_id, &stream, &component)) {
@@ -5497,7 +5482,7 @@ nice_agent_get_selected_pair (NiceAgent *agent, guint stream_id,
g_return_val_if_fail (local != NULL, FALSE);
g_return_val_if_fail (remote != NULL, FALSE);
- agent_lock();
+ agent_lock (agent);
/* step: check that params specify an existing pair */
if (!agent_find_component (agent, stream_id, component_id,
@@ -5529,7 +5514,7 @@ nice_agent_get_selected_socket (NiceAgent *agent, guint stream_id,
g_return_val_if_fail (stream_id >= 1, NULL);
g_return_val_if_fail (component_id >= 1, NULL);
- agent_lock();
+ agent_lock (agent);
/* Reliable streams are pseudotcp or MUST use RFC 4571 framing */
if (agent->reliable)
@@ -5560,16 +5545,82 @@ nice_agent_get_selected_socket (NiceAgent *agent, guint stream_id,
return g_socket;
}
+typedef struct _TimeoutData
+{
+ GWeakRef/*<NiceAgent>*/ agent_ref;
+ NiceTimeoutLockedCallback function;
+ gpointer user_data;
+} TimeoutData;
+
+static void
+timeout_data_destroy (TimeoutData *data)
+{
+ g_weak_ref_clear (&data->agent_ref);
+ g_slice_free (TimeoutData, data);
+}
+
+static TimeoutData *
+timeout_data_new (NiceAgent *agent, NiceTimeoutLockedCallback function,
+ gpointer user_data)
+{
+ TimeoutData *data = g_slice_new0 (TimeoutData);
+
+ g_weak_ref_init (&data->agent_ref, agent);
+ data->function = function;
+ data->user_data = user_data;
+
+ return data;
+}
+
+static gboolean
+timeout_cb (gpointer user_data)
+{
+ TimeoutData *data = user_data;
+ NiceAgent *agent;
+ gboolean ret = G_SOURCE_REMOVE;
+
+ agent = g_weak_ref_get (&data->agent_ref);
+ if (agent == NULL) {
+ return G_SOURCE_REMOVE;
+ }
+
+ agent_lock (agent);
+
+ /* A race condition might happen where the mutex above waits for the lock
+ * and in the meantime another thread destroys the source.
+ * In that case, we don't need to run the function since it should
+ * have been cancelled */
+ if (g_source_is_destroyed (g_main_current_source ())) {
+ nice_debug ("Source was destroyed. Avoided race condition in timeout_cb");
+
+ agent_unlock (agent);
+ goto end;
+ }
+
+ ret = data->function (agent, data->user_data);
+
+ agent_unlock_and_emit (agent);
+
+ end:
+ g_object_unref (agent);
+
+ return ret;
+}
+
/* Create a new timer GSource with the given @name, @interval, callback
* @function and @data, and assign it to @out, destroying and freeing any
* existing #GSource in @out first.
*
* This guarantees that a timer won’t be overwritten without being destroyed.
+ *
+ * @interval is given in milliseconds.
*/
-void agent_timeout_add_with_context (NiceAgent *agent, GSource **out,
- const gchar *name, guint interval, GSourceFunc function, gpointer data)
+static void agent_timeout_add_with_context_internal (NiceAgent *agent,
+ GSource **out, const gchar *name, guint interval, gboolean seconds,
+ NiceTimeoutLockedCallback function, gpointer user_data)
{
GSource *source;
+ TimeoutData *data;
g_return_if_fail (function != NULL);
g_return_if_fail (out != NULL);
@@ -5582,16 +5633,36 @@ void agent_timeout_add_with_context (NiceAgent *agent, GSource **out,
}
/* Create the new source. */
- source = g_timeout_source_new (interval);
+ if (seconds)
+ source = g_timeout_source_new_seconds (interval);
+ else
+ source = g_timeout_source_new (interval);
g_source_set_name (source, name);
- g_source_set_callback (source, function, data, NULL);
+ data = timeout_data_new (agent, function, user_data);
+ g_source_set_callback (source, timeout_cb, data,
+ (GDestroyNotify)timeout_data_destroy);
g_source_attach (source, agent->main_context);
/* Return it! */
*out = source;
}
+void agent_timeout_add_with_context (NiceAgent *agent,
+ GSource **out, const gchar *name, guint interval,
+ NiceTimeoutLockedCallback function, gpointer user_data)
+{
+ agent_timeout_add_with_context_internal (agent, out, name, interval, FALSE,
+ function, user_data);
+}
+
+void agent_timeout_add_seconds_with_context (NiceAgent *agent,
+ GSource **out, const gchar *name, guint interval,
+ NiceTimeoutLockedCallback function, gpointer user_data)
+{
+ agent_timeout_add_with_context_internal (agent, out, name, interval, TRUE,
+ function, user_data);
+}
NICEAPI_EXPORT gboolean
nice_agent_set_selected_remote_candidate (
@@ -5612,7 +5683,7 @@ nice_agent_set_selected_remote_candidate (
g_return_val_if_fail (component_id != 0, FALSE);
g_return_val_if_fail (candidate != NULL, FALSE);
- agent_lock();
+ agent_lock (agent);
/* step: check if the component exists*/
if (!agent_find_component (agent, stream_id, component_id, &stream, &component)) {
@@ -5701,7 +5772,7 @@ nice_agent_set_stream_tos (NiceAgent *agent,
g_return_if_fail (NICE_IS_AGENT (agent));
g_return_if_fail (stream_id >= 1);
- agent_lock();
+ agent_lock (agent);
stream = agent_find_stream (agent, stream_id);
if (stream == NULL)
@@ -5727,7 +5798,7 @@ nice_agent_set_software (NiceAgent *agent, const gchar *software)
{
g_return_if_fail (NICE_IS_AGENT (agent));
- agent_lock();
+ agent_lock (agent);
g_free (agent->software_attribute);
if (software)
@@ -5762,7 +5833,7 @@ nice_agent_set_stream_name (NiceAgent *agent, guint stream_id,
" are valid", name);
}
- agent_lock();
+ agent_lock (agent);
for (i = agent->streams; i; i = i->next) {
NiceStream *stream = i->data;
@@ -5797,7 +5868,7 @@ nice_agent_get_stream_name (NiceAgent *agent, guint stream_id)
g_return_val_if_fail (NICE_IS_AGENT (agent), NULL);
g_return_val_if_fail (stream_id >= 1, NULL);
- agent_lock();
+ agent_lock (agent);
stream = agent_find_stream (agent, stream_id);
if (stream == NULL)
@@ -5871,7 +5942,7 @@ nice_agent_get_default_local_candidate (NiceAgent *agent,
g_return_val_if_fail (stream_id >= 1, NULL);
g_return_val_if_fail (component_id >= 1, NULL);
- agent_lock ();
+ agent_lock (agent);
/* step: check if the component exists*/
if (!agent_find_component (agent, stream_id, component_id,
@@ -6030,7 +6101,7 @@ nice_agent_generate_local_sdp (NiceAgent *agent)
g_return_val_if_fail (NICE_IS_AGENT (agent), NULL);
- agent_lock();
+ agent_lock (agent);
for (i = agent->streams; i; i = i->next) {
NiceStream *stream = i->data;
@@ -6054,7 +6125,7 @@ nice_agent_generate_local_stream_sdp (NiceAgent *agent, guint stream_id,
g_return_val_if_fail (NICE_IS_AGENT (agent), NULL);
g_return_val_if_fail (stream_id >= 1, NULL);
- agent_lock();
+ agent_lock (agent);
stream = agent_find_stream (agent, stream_id);
if (stream == NULL)
@@ -6079,7 +6150,7 @@ nice_agent_generate_local_candidate_sdp (NiceAgent *agent,
g_return_val_if_fail (NICE_IS_AGENT (agent), NULL);
g_return_val_if_fail (candidate != NULL, NULL);
- agent_lock();
+ agent_lock (agent);
sdp = g_string_new (NULL);
_generate_candidate_sdp (agent, candidate, sdp);
@@ -6101,7 +6172,7 @@ nice_agent_parse_remote_sdp (NiceAgent *agent, const gchar *sdp)
g_return_val_if_fail (NICE_IS_AGENT (agent), -1);
g_return_val_if_fail (sdp != NULL, -1);
- agent_lock();
+ agent_lock (agent);
for (l = agent->streams; l; l = l->next) {
NiceStream *stream = l->data;
@@ -6193,7 +6264,7 @@ nice_agent_parse_remote_stream_sdp (NiceAgent *agent, guint stream_id,
g_return_val_if_fail (stream_id >= 1, NULL);
g_return_val_if_fail (sdp != NULL, NULL);
- agent_lock();
+ agent_lock (agent);
stream = agent_find_stream (agent, stream_id);
if (stream == NULL) {
@@ -6372,7 +6443,7 @@ nice_agent_get_io_stream (NiceAgent *agent, guint stream_id,
g_return_val_if_fail (agent->reliable, NULL);
- agent_lock ();
+ agent_lock (agent);
if (!agent_find_component (agent, stream_id, component_id, NULL, &component))
goto done;
@@ -6398,7 +6469,7 @@ nice_agent_forget_relays (NiceAgent *agent, guint stream_id, guint component_id)
g_return_val_if_fail (stream_id >= 1, FALSE);
g_return_val_if_fail (component_id >= 1, FALSE);
- agent_lock ();
+ agent_lock (agent);
if (!agent_find_component (agent, stream_id, component_id, NULL, &component)) {
ret = FALSE;
@@ -6455,12 +6526,12 @@ nice_agent_get_component_state (NiceAgent *agent,
NiceComponentState state = NICE_COMPONENT_STATE_FAILED;
NiceComponent *component;
- agent_lock ();
+ agent_lock (agent);
if (agent_find_component (agent, stream_id, component_id, NULL, &component))
state = component->state;
- agent_unlock ();
+ agent_unlock (agent);
return state;
}