diff options
author | Philip Withnall <philip.withnall@collabora.co.uk> | 2013-12-16 14:02:28 +0000 |
---|---|---|
committer | Olivier Crête <olivier.crete@collabora.com> | 2014-01-31 01:48:58 -0500 |
commit | 243c47ecc9d694ecfe230880081634936770a959 (patch) | |
tree | fb6d863067a4036407d1665206c959c753993ad5 /agent | |
parent | c56727025dd1ffa2e0513bf6bfc5218b58e2b483 (diff) | |
download | libnice-243c47ecc9d694ecfe230880081634936770a959.tar.gz |
agent: Add nice_agent_recv() allowing blocking receives on sockets
This is a blocking receive function, designed to be called from a worker
thread. It cannot be used in conjunction with the existing
nice_agent_attach_recv() API, as the blocking receive and the GSource
would compete over access to the single copy of the data in the kernel’s
receive buffer.
Diffstat (limited to 'agent')
-rw-r--r-- | agent/agent-priv.h | 3 | ||||
-rw-r--r-- | agent/agent.c | 363 | ||||
-rw-r--r-- | agent/agent.h | 9 | ||||
-rw-r--r-- | agent/component.c | 32 | ||||
-rw-r--r-- | agent/component.h | 20 |
5 files changed, 322 insertions, 105 deletions
diff --git a/agent/agent-priv.h b/agent/agent-priv.h index 350d1a3..b2d5c54 100644 --- a/agent/agent-priv.h +++ b/agent/agent-priv.h @@ -178,4 +178,7 @@ component_io_cb ( GIOCondition condition, gpointer data); +gssize agent_recv_locked (NiceAgent *agent, Stream *stream, + Component *component, NiceSocket *socket, guint8 *buf, gsize buf_len); + #endif /*_NICE_AGENT_PRIV_H */ diff --git a/agent/agent.c b/agent/agent.c index 647b543..bd21e78 100644 --- a/agent/agent.c +++ b/agent/agent.c @@ -1,7 +1,7 @@ /* * This file is part of the Nice GLib ICE library. * - * (C) 2006-2010 Collabora Ltd. + * (C) 2006-2010, 2013 Collabora Ltd. * Contact: Youness Alaoui * (C) 2006-2010 Nokia Corporation. All rights reserved. * Contact: Kai Vehmanen @@ -25,6 +25,7 @@ * Dafydd Harries, Collabora Ltd. * Youness Alaoui, Collabora Ltd. * Kai Vehmanen, Nokia + * Philip Withnall, Collabora Ltd. * * Alternatively, the contents of this file may be used under the terms of the * the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which @@ -982,21 +983,6 @@ nice_agent_set_property ( } - -static void priv_destroy_component_tcp (Component *component) -{ - if (component->tcp_clock) { - g_source_destroy (component->tcp_clock); - g_source_unref (component->tcp_clock); - component->tcp_clock = NULL; - } - if (component->tcp) { - pseudo_tcp_socket_close (component->tcp, TRUE); - g_object_unref (component->tcp); - component->tcp = NULL; - } -} - static void priv_pseudo_tcp_error (NiceAgent *agent, Stream *stream, Component *component) { @@ -1004,8 +990,16 @@ static void priv_pseudo_tcp_error (NiceAgent *agent, Stream *stream, agent_signal_component_state_change (agent, stream->id, component->id, NICE_COMPONENT_STATE_FAILED); component_detach_all_sockets (component); + pseudo_tcp_socket_close (component->tcp, TRUE); + g_object_unref (component->tcp); + component->tcp = NULL; + } + + if (component->tcp_clock) { + g_source_destroy (component->tcp_clock); + g_source_unref (component->tcp_clock); + component->tcp_clock = NULL; } - priv_destroy_component_tcp (component); } static void @@ -1033,6 +1027,7 @@ pseudo_tcp_socket_readable (PseudoTcpSocket *sock, gpointer user_data) Stream *stream = component->stream; guint8 buf[MAX_BUFFER_SIZE]; gssize len; + gboolean has_io_callback; nice_debug ("Agent %p: s%d:%d pseudo Tcp socket readable", agent, stream->id, component->id); @@ -1041,30 +1036,58 @@ pseudo_tcp_socket_readable (PseudoTcpSocket *sock, gpointer user_data) g_object_add_weak_pointer (G_OBJECT (sock), (gpointer *)&sock); g_object_add_weak_pointer (G_OBJECT (agent), (gpointer *)&agent); + has_io_callback = component_has_io_callback (component); do { - gboolean has_io_callback = component_has_io_callback (component); - - if (has_io_callback) + /* Only dequeue pseudo-TCP data if we can reliably inform the client. */ + if (has_io_callback) { len = pseudo_tcp_socket_recv (sock, (gchar *) buf, sizeof(buf)); - else + } else if (component->recv_buf != NULL) { + len = pseudo_tcp_socket_recv (sock, + (gchar *) component->recv_buf + component->recv_buf_valid_len, + component->recv_buf_len - component->recv_buf_valid_len); + } else { len = 0; + } - if (len > 0) { + nice_debug ("%s: len %" G_GSSIZE_FORMAT, G_STRFUNC, len); + + if (len > 0 && has_io_callback) { component_emit_io_callback (component, buf, len); if (sock == NULL) { nice_debug ("PseudoTCP socket got destroyed in readable callback!"); break; } + } else if (len > 0 && component->recv_buf != NULL) { + /* No callback to call. The data has been copied directly into the + * client’s receive buffer. */ + component->recv_buf_valid_len += len; } else if (len < 0 && pseudo_tcp_socket_get_error (sock) != EWOULDBLOCK) { /* Signal error */ + nice_debug ("%s: calling priv_pseudo_tcp_error()", G_STRFUNC); priv_pseudo_tcp_error (agent, stream, component); + + if (component->recv_buf != NULL) { + GIOErrorEnum error_code; + + if (pseudo_tcp_socket_get_error (sock) == ENOTCONN) + error_code = G_IO_ERROR_BROKEN_PIPE; + else + error_code = G_IO_ERROR_FAILED; + + g_set_error (component->recv_buf_error, G_IO_ERROR, error_code, + "Error reading data from pseudo-TCP socket."); + } } else if (len < 0 && pseudo_tcp_socket_get_error (sock) == EWOULDBLOCK){ component->tcp_readable = FALSE; } - } while (len > 0); + + has_io_callback = component_has_io_callback (component); + } while (len > 0 && + (has_io_callback || + component->recv_buf_valid_len < component->recv_buf_len)); if (agent) { adjust_tcp_clock (agent, stream, component); @@ -1097,8 +1120,8 @@ pseudo_tcp_socket_closed (PseudoTcpSocket *sock, guint32 err, NiceAgent *agent = component->agent; Stream *stream = component->stream; - nice_debug ("Agent %p: s%d:%d pseudo Tcp socket closed", agent, - stream->id, component->id); + nice_debug ("Agent %p: s%d:%d pseudo Tcp socket closed. " + "Calling priv_pseudo_tcp_error().", agent, stream->id, component->id); priv_pseudo_tcp_error (agent, stream, component); } @@ -1117,8 +1140,10 @@ pseudo_tcp_socket_write_packet (PseudoTcpSocket *socket, gchar tmpbuf[INET6_ADDRSTRLEN]; nice_address_to_string (&component->selected_pair.remote->addr, tmpbuf); - nice_debug ("Agent %p : s%d:%d: sending %d bytes to [%s]:%d", - component->agent, component->stream->id, component->id, len, tmpbuf, + nice_debug ( + "Agent %p : s%d:%d: sending %d bytes on socket %p (FD %d) to [%s]:%d", + component->agent, component->stream->id, component->id, len, + sock->fileno, g_socket_get_fd (sock->fileno), tmpbuf, nice_address_get_port (&component->selected_pair.remote->addr)); #endif @@ -1176,7 +1201,8 @@ adjust_tcp_clock (NiceAgent *agent, Stream *stream, Component *component) component->tcp_clock = agent_timeout_add_with_context (agent, timeout, notify_pseudo_tcp_socket_clock, component); } else { - nice_debug ("Agent %p: component %d pseudo tcp socket should be destroyed", + nice_debug ("Agent %p: component %d pseudo-TCP socket should be " + "destroyed. Calling priv_pseudo_tcp_error().", agent, component->id); priv_pseudo_tcp_error (agent, stream, component); } @@ -2302,7 +2328,7 @@ nice_agent_set_remote_candidates (NiceAgent *agent, guint stream_id, guint compo } /* - * _nice_agent_recv_locked: + * agent_recv_locked: * @agent: a #NiceAgent * @stream: the stream to receive from * @component: the component to receive from @@ -2311,15 +2337,17 @@ nice_agent_set_remote_candidates (NiceAgent *agent, guint stream_id, guint compo * @buf_len: the length of @buf * * Receive up to @buf_len bytes of data from the given - * @stream/@component/@socket, in a non-blocking fashion. + * @stream/@component/@socket, in a non-blocking fashion. If the socket is a + * datagram socket and @buf_len is not big enough to hold an entire packet, the + * remaining bytes of the packet will be silently dropped. * * NOTE: Must be called with the agent’s lock held. * * Returns: number of bytes stored in @buf, 0 if no data is available, or -1 on * error */ -static gssize -_nice_agent_recv_locked ( +gssize +agent_recv_locked ( NiceAgent *agent, Stream *stream, Component *component, @@ -2330,9 +2358,15 @@ _nice_agent_recv_locked ( NiceAddress from; gssize len; GList *item; + guint8 local_buf[MAX_BUFFER_SIZE]; + gsize local_buf_len = MAX_BUFFER_SIZE; - /* Returns -1 on error, 0 on EWOULDBLOCK, and > 0 on success. */ - len = nice_socket_recv (socket, &from, buf_len, (gchar *) buf); + /* Returns -1 on error, 0 on EWOULDBLOCK, and > 0 on success. + * + * FIXME: We have to receive into a local buffer then copy out because + * otherwise, if @buf is too small, we could lose data, even when in + * reliable mode (because reliable streams are packetised). */ + len = nice_socket_recv (socket, &from, local_buf_len, (gchar *) local_buf); if (len == 0) { return 0; @@ -2352,14 +2386,6 @@ _nice_agent_recv_locked ( } #endif - - if ((gsize) len > buf_len) - { - /* buffer is not big enough to accept this packet */ - /* XXX: test this case */ - return 0; - } - for (item = component->turn_servers; item; item = g_list_next (item)) { TurnServer *turn = item->data; if (nice_address_equal (&from, &turn->server)) { @@ -2374,7 +2400,7 @@ _nice_agent_recv_locked ( cand->stream_id == stream->id && cand->component_id == component->id) { len = nice_turn_socket_parse_recv (cand->sockptr, &socket, - &from, len, (gchar *) buf, &from, (gchar *) buf, len); + &from, len, (gchar *) local_buf, &from, (gchar *) local_buf, len); } } break; @@ -2383,15 +2409,15 @@ _nice_agent_recv_locked ( agent->media_after_tick = TRUE; - if (stun_message_validate_buffer_length ((uint8_t *) buf, (size_t) len, + if (stun_message_validate_buffer_length ((uint8_t *) local_buf, (size_t) len, (agent->compatibility != NICE_COMPATIBILITY_OC2007 && agent->compatibility != NICE_COMPATIBILITY_OC2007R2)) != len) - /* If the retval is no 0, its not a valid stun packet, probably data */ + /* If the retval is not 0, it’s not a valid STUN packet; probably data. */ goto handle_tcp; if (conn_check_handle_inbound_stun (agent, stream, component, socket, - &from, (gchar *) buf, len)) + &from, (gchar *) local_buf, len)) /* handled STUN message*/ return 0; @@ -2400,7 +2426,10 @@ handle_tcp: if (len > 0 && component->tcp) { /* Received data on a reliable connection. */ g_object_add_weak_pointer (G_OBJECT (agent), (gpointer *) &agent); - pseudo_tcp_socket_notify_packet (component->tcp, (gchar *) buf, len); + + nice_debug ("%s: notifying pseudo-TCP of packet, length %" G_GSSIZE_FORMAT, + G_STRFUNC, len); + pseudo_tcp_socket_notify_packet (component->tcp, (gchar *) local_buf, len); if (agent) { adjust_tcp_clock (agent, stream, component); @@ -2418,9 +2447,194 @@ handle_tcp: return 0; } + /* Yay for poor performance! */ + if (len >= 0) { + len = MIN (buf_len, (gsize) len); + memcpy (buf, local_buf, len); + } + return len; } +static gboolean +nice_agent_recv_cancelled_cb (GCancellable *cancellable, gpointer user_data) +{ + GError **error = user_data; + return !g_cancellable_set_error_if_cancelled (cancellable, error); +} + +/** + * nice_agent_recv: + * @agent: a #NiceAgent + * @stream_id: the ID of the stream to receive on + * @component_id: the ID of the component to receive on + * @buf: (array length=buf_len) (out caller-allocates): caller-allocated buffer + * to write the received data into, of length at least @buf_len + * @buf_len: length of @buf + * @cancellable: (allow-none): a #GCancellable to allow the operation to be + * cancelled from another thread, or %NULL + * @error: (allow-none): return location for a #GError, or %NULL + * + * Block on receiving data from the given stream/component combination on + * @agent, returning only once at least 1 byte has been received and written + * into @buf, the stream is closed by the other end or by calling + * nice_agent_remove_stream(), or @cancellable is cancelled. + * + * In the non-error case, in reliable mode, this will block until exactly + * @buf_len bytes have been received. In non-reliable mode, it will block until + * a single message has been received. In this case, @buf must be big enough to + * contain an entire message (65536 bytes), or any excess data may be silently + * dropped. + * + * This must not be used in combination with nice_agent_attach_recv() on the + * same stream/component pair. + * + * Internally, this may iterate the current thread’s default main context. + * + * If the stream/component pair doesn’t exist, or if a suitable candidate socket + * hasn’t yet been selected for it, a %G_IO_ERROR_BROKEN_PIPE error will be + * returned. A %G_IO_ERROR_CANCELLED error will be returned if the operation was + * cancelled. %G_IO_ERROR_FAILED will be returned for other errors. + * + * Returns: the number of bytes written to @buf on success (guaranteed to be + * greater than 0 unless @buf_len is 0), or -1 on error + * + * Since: 0.1.5 + */ +NICEAPI_EXPORT gssize +nice_agent_recv (NiceAgent *agent, guint stream_id, guint component_id, + guint8 *buf, gsize buf_len, GCancellable *cancellable, GError **error) +{ + GMainContext *context; + Stream *stream; + Component *component; + gssize len = -1; + GSource *cancellable_source = NULL; + gboolean received_enough = FALSE, error_reported = FALSE; + GError *child_error = NULL; + + g_return_val_if_fail (NICE_IS_AGENT (agent), -1); + g_return_val_if_fail (stream_id >= 1, -1); + g_return_val_if_fail (component_id >= 1, -1); + g_return_val_if_fail (buf != NULL, -1); + g_return_val_if_fail ( + cancellable == NULL || G_IS_CANCELLABLE (cancellable), -1); + g_return_val_if_fail (error == NULL || *error == NULL, -1); + + if (buf_len == 0) + return 0; + + agent_lock (); + + /* We’re not going to do the + * implement-a-ring-buffer-to-cater-for-tiny-input-buffers game, so just warn + * if the buffer size is too small, and silently drop any overspilling + * bytes. */ + g_warn_if_fail (agent->reliable || buf_len >= MAX_BUFFER_SIZE); + + if (!agent_find_component (agent, stream_id, component_id, + &stream, &component)) { + g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_BROKEN_PIPE, + "Invalid stream/component."); + goto done; + } + + /* Set the component’s receive buffer. */ + context = component_dup_io_context (component); + component_set_io_callback (component, NULL, NULL, buf, buf_len, &child_error); + + /* Add the cancellable as a source. */ + if (cancellable != NULL) { + cancellable_source = g_cancellable_source_new (cancellable); + g_source_set_callback (cancellable_source, + (GSourceFunc) nice_agent_recv_cancelled_cb, &child_error, NULL); + g_source_attach (cancellable_source, context); + } + + /* Is there already pending data left over from having an I/O callback + * attached and switching to using nice_agent_recv()? This is a horrifically + * specific use case which I hope nobody ever tries. And yet, it still must be + * supported. */ + g_mutex_lock (&component->io_mutex); + + while (!received_enough && + !g_queue_is_empty (&component->pending_io_messages)) { + IOCallbackData *data; + gsize copied_len; + + g_assert (component->io_callback_id == 0); + + data = g_queue_peek_head (&component->pending_io_messages); + copied_len = MIN (data->buf_len - data->offset, + component->recv_buf_len - component->recv_buf_valid_len); + + memcpy (component->recv_buf + component->recv_buf_valid_len, + data->buf + data->offset, len); + component->recv_buf_valid_len += copied_len; + + /* If we only managed to grab part of the buffer, leave the buffer in the + * queue and have another go at it later. */ + if (copied_len < data->buf_len - data->offset) { + data->offset += copied_len; + } else { + g_queue_pop_head (&component->pending_io_messages); + io_callback_data_free (data); + } + + received_enough = + ((agent->reliable && component->recv_buf_valid_len >= buf_len) || + (!agent->reliable && component->recv_buf_valid_len > 0)); + } + + g_mutex_unlock (&component->io_mutex); + + /* Each iteration of the main context will either receive some data, a + * cancellation error or a socket error. + * + * In reliable mode, iterate the loop enough to receive exactly @buf_len + * bytes. In non-reliable mode, iterate the loop to receive a single message. + */ + while (!received_enough && !error_reported) { + agent_unlock (); + g_main_context_iteration (context, TRUE); + agent_lock (); + + received_enough = + ((agent->reliable && component->recv_buf_valid_len >= buf_len) || + (!agent->reliable && component->recv_buf_valid_len > 0)); + error_reported = (child_error != NULL); + } + + len = component->recv_buf_valid_len; + nice_debug ("%s: len: %" G_GSIZE_FORMAT ", buf_len: %" G_GSIZE_FORMAT, + G_STRFUNC, len, buf_len); + + /* Tidy up. */ + if (cancellable_source != NULL) { + g_source_destroy (cancellable_source); + g_source_unref (cancellable_source); + } + + component_set_io_callback (component, NULL, NULL, NULL, 0, NULL); + g_main_context_unref (context); + + /* Handle errors and cancellations. */ + if (!received_enough) { + g_assert (error_reported); + len = -1; + } + +done: + g_assert ((child_error != NULL) == (len == -1)); + g_assert (len != 0); + + if (child_error != NULL) + g_propagate_error (error, child_error); + + agent_unlock (); + + return len; +} NICEAPI_EXPORT gint nice_agent_send ( @@ -2624,46 +2838,6 @@ nice_agent_dispose (GObject *object) } - -typedef struct _IOCtx IOCtx; - -struct _IOCtx -{ - GSource *source; - NiceAgent *agent; - Stream *stream; - Component *component; - NiceSocket *socket; -}; - - -static IOCtx * -io_ctx_new ( - NiceAgent *agent, - Stream *stream, - Component *component, - NiceSocket *socket, - GSource *source) -{ - IOCtx *ctx; - - ctx = g_slice_new0 (IOCtx); - ctx->agent = agent; - ctx->stream = stream; - ctx->component = component; - ctx->socket = socket; - ctx->source = source; - - return ctx; -} - - -static void -io_ctx_free (IOCtx *ctx) -{ - g_slice_free (IOCtx, ctx); -} - gboolean component_io_cb (GSocket *socket, GIOCondition condition, gpointer user_data) { @@ -2676,7 +2850,7 @@ component_io_cb (GSocket *socket, GIOCondition condition, gpointer user_data) guint8 *recv_buf; gsize recv_buf_len; gboolean retval = FALSE; - NiceAgentRecvFunc io_callback; + gboolean has_io_callback; agent_lock (); @@ -2690,17 +2864,14 @@ component_io_cb (GSocket *socket, GIOCondition condition, gpointer user_data) goto done; } - /* FIXME: Compartmentalise this in component.c */ - g_mutex_lock (&component->io_mutex); - io_callback = component->io_callback; - g_mutex_unlock (&component->io_mutex); + has_io_callback = component_has_io_callback (component); /* Choose which receive buffer to use. If we’re reading for * nice_agent_attach_recv(), use a local static buffer. If we’re reading for * nice_agent_recv(), use the buffer provided by the client. */ - g_assert (io_callback == NULL || component->recv_buf == NULL); + g_assert (!has_io_callback || component->recv_buf == NULL); - if (io_callback != NULL) { + if (has_io_callback) { recv_buf = local_buf; recv_buf_len = sizeof (local_buf); } else if (component->recv_buf != NULL) { @@ -2739,7 +2910,7 @@ component_io_cb (GSocket *socket, GIOCondition condition, gpointer user_data) } /* Actual data to notify the client about. */ - if (io_callback != NULL) { + if (has_io_callback) { component_emit_io_callback (component, recv_buf, len); } else { /* Data has been stored in the component’s receive buffer to be picked up @@ -2784,7 +2955,7 @@ nice_agent_attach_recv ( /* Set the component’s I/O context. */ component_set_io_context (component, ctx); - component_set_io_callback (component, func, data); + component_set_io_callback (component, func, data, NULL, 0, NULL); ret = TRUE; if (func) { diff --git a/agent/agent.h b/agent/agent.h index c2a39d1..64df422 100644 --- a/agent/agent.h +++ b/agent/agent.h @@ -676,6 +676,15 @@ nice_agent_attach_recv ( NiceAgentRecvFunc func, gpointer data); +gssize +nice_agent_recv ( + NiceAgent *agent, + guint stream_id, + guint component_id, + guint8 *buf, + gsize buf_len, + GCancellable *cancellable, + GError **error); /** * nice_agent_set_selected_pair: diff --git a/agent/component.c b/agent/component.c index 62ecd4f..1bf6c33 100644 --- a/agent/component.c +++ b/agent/component.c @@ -61,7 +61,7 @@ component_deschedule_io_callback (Component *component); /* Must *not* take the agent lock, since it’s called from within - * component_set_io_callback(), which holds the Component’s I/O lock. */ + * component_set_io_context(), which holds the Component’s I/O lock. */ static void socket_source_attach (SocketSource *socket_source, GMainContext *context) { @@ -129,7 +129,7 @@ component_new (guint id, NiceAgent *agent, Stream *stream) * will be updated when nice_agent_attach_recv() or nice_agent_recv() are * called. */ component_set_io_context (component, NULL); - component_set_io_callback (component, NULL, NULL); + component_set_io_callback (component, NULL, NULL, NULL, 0, NULL); return component; } @@ -438,7 +438,7 @@ component_attach_socket (Component *component, NiceSocket *socket) /* Reattaches socket handles of @component to the main context. * * Must *not* take the agent lock, since it’s called from within - * component_set_io_callback(), which holds the Component’s I/O lock. */ + * component_set_io_context(), which holds the Component’s I/O lock. */ static void component_reattach_all_sockets (Component *component) { @@ -486,7 +486,7 @@ component_detach_socket (Component *component, NiceSocket *socket) * sockets themselves untouched. * * Must *not* take the agent lock, since it’s called from within - * component_set_io_callback(), which holds the Component’s I/O lock. + * component_set_io_context(), which holds the Component’s I/O lock. */ void component_detach_all_sockets (Component *component) @@ -511,6 +511,12 @@ component_free_socket_sources (Component *component) component->socket_sources = NULL; } +GMainContext * +component_dup_io_context (Component *component) +{ + return g_main_context_ref (component->ctx); +} + /* If @context is %NULL, a fresh context is used, so component->ctx is always * guaranteed to be non-%NULL. */ void @@ -534,24 +540,40 @@ component_set_io_context (Component *component, GMainContext *context) g_mutex_unlock (&component->io_mutex); } +/* (func, user_data) and (recv_buf, recv_buf_len) are mutually exclusive. + * At most one of the two must be specified; if both are NULL, the Component + * will not receive any data (i.e. reception is paused). */ void component_set_io_callback (Component *component, - NiceAgentRecvFunc func, gpointer user_data) + NiceAgentRecvFunc func, gpointer user_data, + guint8 *recv_buf, gsize recv_buf_len, + GError **error) { + g_assert (func == NULL || recv_buf == NULL); + g_assert (recv_buf != NULL || recv_buf_len == 0); + g_assert (error == NULL || *error == NULL); + g_mutex_lock (&component->io_mutex); if (func != NULL) { component->io_callback = func; component->io_user_data = user_data; + component->recv_buf = NULL; + component->recv_buf_len = 0; component_schedule_io_callback (component); } else { component->io_callback = NULL; component->io_user_data = NULL; + component->recv_buf = recv_buf; + component->recv_buf_len = recv_buf_len; component_deschedule_io_callback (component); } + component->recv_buf_valid_len = 0; + component->recv_buf_error = error; + g_mutex_unlock (&component->io_mutex); } diff --git a/agent/component.h b/agent/component.h index 5186bc1..9879b46 100644 --- a/agent/component.h +++ b/agent/component.h @@ -150,7 +150,10 @@ struct _Component /* I/O handling. The main context must always be non-NULL, and is used for all * socket recv() operations. All io_callback emissions are invoked in this - * context too. */ + * context too. + * + * recv_buf and io_callback are mutually exclusive, but it is allowed for both + * to be NULL if the Component is not currently ready to receive data. */ GMutex io_mutex; /**< protects io_callback, io_user_data, pending_io_messages and io_callback_id. immutable: can be accessed without @@ -159,8 +162,6 @@ struct _Component taken before this one */ NiceAgentRecvFunc io_callback; /**< function called on io cb */ gpointer io_user_data; /**< data passed to the io function */ - GMainContext *ctx; /**< context for GSources for this - component */ GQueue pending_io_messages; /**< queue of packets which have been received but not passed to the client in an I/O callback or recv() call yet. @@ -168,6 +169,13 @@ struct _Component IOCallbackData */ guint io_callback_id; /* GSource ID of the I/O callback */ + GMainContext *ctx; /**< context for GSources for this + component */ + guint8 *recv_buf; /**< unowned buffer for receiving into */ + gsize recv_buf_len; /**< allocated size of recv_buf in bytes */ + gsize recv_buf_valid_len; /**< length of valid data in recv_buf */ + GError **recv_buf_error; /**< error information about failed reads */ + NiceAgent *agent; /* unowned, immutable: can be accessed without holding the * agent lock */ Stream *stream; /* unowned, immutable: can be accessed without holding the @@ -212,11 +220,15 @@ component_detach_all_sockets (Component *component); void component_free_socket_sources (Component *component); +GMainContext * +component_dup_io_context (Component *component); void component_set_io_context (Component *component, GMainContext *context); void component_set_io_callback (Component *component, - NiceAgentRecvFunc func, gpointer user_data); + NiceAgentRecvFunc func, gpointer user_data, + guint8 *recv_buf, gsize recv_buf_len, + GError **error); void component_emit_io_callback (Component *component, const guint8 *buf, gsize buf_len); |