summaryrefslogtreecommitdiff
path: root/agent
diff options
context:
space:
mode:
authorPhilip Withnall <philip.withnall@collabora.co.uk>2013-12-16 14:02:28 +0000
committerOlivier Crête <olivier.crete@collabora.com>2014-01-31 01:48:58 -0500
commit243c47ecc9d694ecfe230880081634936770a959 (patch)
treefb6d863067a4036407d1665206c959c753993ad5 /agent
parentc56727025dd1ffa2e0513bf6bfc5218b58e2b483 (diff)
downloadlibnice-243c47ecc9d694ecfe230880081634936770a959.tar.gz
agent: Add nice_agent_recv() allowing blocking receives on sockets
This is a blocking receive function, designed to be called from a worker thread. It cannot be used in conjunction with the existing nice_agent_attach_recv() API, as the blocking receive and the GSource would compete over access to the single copy of the data in the kernel’s receive buffer.
Diffstat (limited to 'agent')
-rw-r--r--agent/agent-priv.h3
-rw-r--r--agent/agent.c363
-rw-r--r--agent/agent.h9
-rw-r--r--agent/component.c32
-rw-r--r--agent/component.h20
5 files changed, 322 insertions, 105 deletions
diff --git a/agent/agent-priv.h b/agent/agent-priv.h
index 350d1a3..b2d5c54 100644
--- a/agent/agent-priv.h
+++ b/agent/agent-priv.h
@@ -178,4 +178,7 @@ component_io_cb (
GIOCondition condition,
gpointer data);
+gssize agent_recv_locked (NiceAgent *agent, Stream *stream,
+ Component *component, NiceSocket *socket, guint8 *buf, gsize buf_len);
+
#endif /*_NICE_AGENT_PRIV_H */
diff --git a/agent/agent.c b/agent/agent.c
index 647b543..bd21e78 100644
--- a/agent/agent.c
+++ b/agent/agent.c
@@ -1,7 +1,7 @@
/*
* This file is part of the Nice GLib ICE library.
*
- * (C) 2006-2010 Collabora Ltd.
+ * (C) 2006-2010, 2013 Collabora Ltd.
* Contact: Youness Alaoui
* (C) 2006-2010 Nokia Corporation. All rights reserved.
* Contact: Kai Vehmanen
@@ -25,6 +25,7 @@
* Dafydd Harries, Collabora Ltd.
* Youness Alaoui, Collabora Ltd.
* Kai Vehmanen, Nokia
+ * Philip Withnall, Collabora Ltd.
*
* Alternatively, the contents of this file may be used under the terms of the
* the GNU Lesser General Public License Version 2.1 (the "LGPL"), in which
@@ -982,21 +983,6 @@ nice_agent_set_property (
}
-
-static void priv_destroy_component_tcp (Component *component)
-{
- if (component->tcp_clock) {
- g_source_destroy (component->tcp_clock);
- g_source_unref (component->tcp_clock);
- component->tcp_clock = NULL;
- }
- if (component->tcp) {
- pseudo_tcp_socket_close (component->tcp, TRUE);
- g_object_unref (component->tcp);
- component->tcp = NULL;
- }
-}
-
static void priv_pseudo_tcp_error (NiceAgent *agent, Stream *stream,
Component *component)
{
@@ -1004,8 +990,16 @@ static void priv_pseudo_tcp_error (NiceAgent *agent, Stream *stream,
agent_signal_component_state_change (agent, stream->id,
component->id, NICE_COMPONENT_STATE_FAILED);
component_detach_all_sockets (component);
+ pseudo_tcp_socket_close (component->tcp, TRUE);
+ g_object_unref (component->tcp);
+ component->tcp = NULL;
+ }
+
+ if (component->tcp_clock) {
+ g_source_destroy (component->tcp_clock);
+ g_source_unref (component->tcp_clock);
+ component->tcp_clock = NULL;
}
- priv_destroy_component_tcp (component);
}
static void
@@ -1033,6 +1027,7 @@ pseudo_tcp_socket_readable (PseudoTcpSocket *sock, gpointer user_data)
Stream *stream = component->stream;
guint8 buf[MAX_BUFFER_SIZE];
gssize len;
+ gboolean has_io_callback;
nice_debug ("Agent %p: s%d:%d pseudo Tcp socket readable", agent,
stream->id, component->id);
@@ -1041,30 +1036,58 @@ pseudo_tcp_socket_readable (PseudoTcpSocket *sock, gpointer user_data)
g_object_add_weak_pointer (G_OBJECT (sock), (gpointer *)&sock);
g_object_add_weak_pointer (G_OBJECT (agent), (gpointer *)&agent);
+ has_io_callback = component_has_io_callback (component);
do {
- gboolean has_io_callback = component_has_io_callback (component);
-
- if (has_io_callback)
+ /* Only dequeue pseudo-TCP data if we can reliably inform the client. */
+ if (has_io_callback) {
len = pseudo_tcp_socket_recv (sock, (gchar *) buf, sizeof(buf));
- else
+ } else if (component->recv_buf != NULL) {
+ len = pseudo_tcp_socket_recv (sock,
+ (gchar *) component->recv_buf + component->recv_buf_valid_len,
+ component->recv_buf_len - component->recv_buf_valid_len);
+ } else {
len = 0;
+ }
- if (len > 0) {
+ nice_debug ("%s: len %" G_GSSIZE_FORMAT, G_STRFUNC, len);
+
+ if (len > 0 && has_io_callback) {
component_emit_io_callback (component, buf, len);
if (sock == NULL) {
nice_debug ("PseudoTCP socket got destroyed in readable callback!");
break;
}
+ } else if (len > 0 && component->recv_buf != NULL) {
+ /* No callback to call. The data has been copied directly into the
+ * client’s receive buffer. */
+ component->recv_buf_valid_len += len;
} else if (len < 0 &&
pseudo_tcp_socket_get_error (sock) != EWOULDBLOCK) {
/* Signal error */
+ nice_debug ("%s: calling priv_pseudo_tcp_error()", G_STRFUNC);
priv_pseudo_tcp_error (agent, stream, component);
+
+ if (component->recv_buf != NULL) {
+ GIOErrorEnum error_code;
+
+ if (pseudo_tcp_socket_get_error (sock) == ENOTCONN)
+ error_code = G_IO_ERROR_BROKEN_PIPE;
+ else
+ error_code = G_IO_ERROR_FAILED;
+
+ g_set_error (component->recv_buf_error, G_IO_ERROR, error_code,
+ "Error reading data from pseudo-TCP socket.");
+ }
} else if (len < 0 &&
pseudo_tcp_socket_get_error (sock) == EWOULDBLOCK){
component->tcp_readable = FALSE;
}
- } while (len > 0);
+
+ has_io_callback = component_has_io_callback (component);
+ } while (len > 0 &&
+ (has_io_callback ||
+ component->recv_buf_valid_len < component->recv_buf_len));
if (agent) {
adjust_tcp_clock (agent, stream, component);
@@ -1097,8 +1120,8 @@ pseudo_tcp_socket_closed (PseudoTcpSocket *sock, guint32 err,
NiceAgent *agent = component->agent;
Stream *stream = component->stream;
- nice_debug ("Agent %p: s%d:%d pseudo Tcp socket closed", agent,
- stream->id, component->id);
+ nice_debug ("Agent %p: s%d:%d pseudo Tcp socket closed. "
+ "Calling priv_pseudo_tcp_error().", agent, stream->id, component->id);
priv_pseudo_tcp_error (agent, stream, component);
}
@@ -1117,8 +1140,10 @@ pseudo_tcp_socket_write_packet (PseudoTcpSocket *socket,
gchar tmpbuf[INET6_ADDRSTRLEN];
nice_address_to_string (&component->selected_pair.remote->addr, tmpbuf);
- nice_debug ("Agent %p : s%d:%d: sending %d bytes to [%s]:%d",
- component->agent, component->stream->id, component->id, len, tmpbuf,
+ nice_debug (
+ "Agent %p : s%d:%d: sending %d bytes on socket %p (FD %d) to [%s]:%d",
+ component->agent, component->stream->id, component->id, len,
+ sock->fileno, g_socket_get_fd (sock->fileno), tmpbuf,
nice_address_get_port (&component->selected_pair.remote->addr));
#endif
@@ -1176,7 +1201,8 @@ adjust_tcp_clock (NiceAgent *agent, Stream *stream, Component *component)
component->tcp_clock = agent_timeout_add_with_context (agent,
timeout, notify_pseudo_tcp_socket_clock, component);
} else {
- nice_debug ("Agent %p: component %d pseudo tcp socket should be destroyed",
+ nice_debug ("Agent %p: component %d pseudo-TCP socket should be "
+ "destroyed. Calling priv_pseudo_tcp_error().",
agent, component->id);
priv_pseudo_tcp_error (agent, stream, component);
}
@@ -2302,7 +2328,7 @@ nice_agent_set_remote_candidates (NiceAgent *agent, guint stream_id, guint compo
}
/*
- * _nice_agent_recv_locked:
+ * agent_recv_locked:
* @agent: a #NiceAgent
* @stream: the stream to receive from
* @component: the component to receive from
@@ -2311,15 +2337,17 @@ nice_agent_set_remote_candidates (NiceAgent *agent, guint stream_id, guint compo
* @buf_len: the length of @buf
*
* Receive up to @buf_len bytes of data from the given
- * @stream/@component/@socket, in a non-blocking fashion.
+ * @stream/@component/@socket, in a non-blocking fashion. If the socket is a
+ * datagram socket and @buf_len is not big enough to hold an entire packet, the
+ * remaining bytes of the packet will be silently dropped.
*
* NOTE: Must be called with the agent’s lock held.
*
* Returns: number of bytes stored in @buf, 0 if no data is available, or -1 on
* error
*/
-static gssize
-_nice_agent_recv_locked (
+gssize
+agent_recv_locked (
NiceAgent *agent,
Stream *stream,
Component *component,
@@ -2330,9 +2358,15 @@ _nice_agent_recv_locked (
NiceAddress from;
gssize len;
GList *item;
+ guint8 local_buf[MAX_BUFFER_SIZE];
+ gsize local_buf_len = MAX_BUFFER_SIZE;
- /* Returns -1 on error, 0 on EWOULDBLOCK, and > 0 on success. */
- len = nice_socket_recv (socket, &from, buf_len, (gchar *) buf);
+ /* Returns -1 on error, 0 on EWOULDBLOCK, and > 0 on success.
+ *
+ * FIXME: We have to receive into a local buffer then copy out because
+ * otherwise, if @buf is too small, we could lose data, even when in
+ * reliable mode (because reliable streams are packetised). */
+ len = nice_socket_recv (socket, &from, local_buf_len, (gchar *) local_buf);
if (len == 0) {
return 0;
@@ -2352,14 +2386,6 @@ _nice_agent_recv_locked (
}
#endif
-
- if ((gsize) len > buf_len)
- {
- /* buffer is not big enough to accept this packet */
- /* XXX: test this case */
- return 0;
- }
-
for (item = component->turn_servers; item; item = g_list_next (item)) {
TurnServer *turn = item->data;
if (nice_address_equal (&from, &turn->server)) {
@@ -2374,7 +2400,7 @@ _nice_agent_recv_locked (
cand->stream_id == stream->id &&
cand->component_id == component->id) {
len = nice_turn_socket_parse_recv (cand->sockptr, &socket,
- &from, len, (gchar *) buf, &from, (gchar *) buf, len);
+ &from, len, (gchar *) local_buf, &from, (gchar *) local_buf, len);
}
}
break;
@@ -2383,15 +2409,15 @@ _nice_agent_recv_locked (
agent->media_after_tick = TRUE;
- if (stun_message_validate_buffer_length ((uint8_t *) buf, (size_t) len,
+ if (stun_message_validate_buffer_length ((uint8_t *) local_buf, (size_t) len,
(agent->compatibility != NICE_COMPATIBILITY_OC2007 &&
agent->compatibility != NICE_COMPATIBILITY_OC2007R2)) != len)
- /* If the retval is no 0, its not a valid stun packet, probably data */
+ /* If the retval is not 0, it’s not a valid STUN packet; probably data. */
goto handle_tcp;
if (conn_check_handle_inbound_stun (agent, stream, component, socket,
- &from, (gchar *) buf, len))
+ &from, (gchar *) local_buf, len))
/* handled STUN message*/
return 0;
@@ -2400,7 +2426,10 @@ handle_tcp:
if (len > 0 && component->tcp) {
/* Received data on a reliable connection. */
g_object_add_weak_pointer (G_OBJECT (agent), (gpointer *) &agent);
- pseudo_tcp_socket_notify_packet (component->tcp, (gchar *) buf, len);
+
+ nice_debug ("%s: notifying pseudo-TCP of packet, length %" G_GSSIZE_FORMAT,
+ G_STRFUNC, len);
+ pseudo_tcp_socket_notify_packet (component->tcp, (gchar *) local_buf, len);
if (agent) {
adjust_tcp_clock (agent, stream, component);
@@ -2418,9 +2447,194 @@ handle_tcp:
return 0;
}
+ /* Yay for poor performance! */
+ if (len >= 0) {
+ len = MIN (buf_len, (gsize) len);
+ memcpy (buf, local_buf, len);
+ }
+
return len;
}
+static gboolean
+nice_agent_recv_cancelled_cb (GCancellable *cancellable, gpointer user_data)
+{
+ GError **error = user_data;
+ return !g_cancellable_set_error_if_cancelled (cancellable, error);
+}
+
+/**
+ * nice_agent_recv:
+ * @agent: a #NiceAgent
+ * @stream_id: the ID of the stream to receive on
+ * @component_id: the ID of the component to receive on
+ * @buf: (array length=buf_len) (out caller-allocates): caller-allocated buffer
+ * to write the received data into, of length at least @buf_len
+ * @buf_len: length of @buf
+ * @cancellable: (allow-none): a #GCancellable to allow the operation to be
+ * cancelled from another thread, or %NULL
+ * @error: (allow-none): return location for a #GError, or %NULL
+ *
+ * Block on receiving data from the given stream/component combination on
+ * @agent, returning only once at least 1 byte has been received and written
+ * into @buf, the stream is closed by the other end or by calling
+ * nice_agent_remove_stream(), or @cancellable is cancelled.
+ *
+ * In the non-error case, in reliable mode, this will block until exactly
+ * @buf_len bytes have been received. In non-reliable mode, it will block until
+ * a single message has been received. In this case, @buf must be big enough to
+ * contain an entire message (65536 bytes), or any excess data may be silently
+ * dropped.
+ *
+ * This must not be used in combination with nice_agent_attach_recv() on the
+ * same stream/component pair.
+ *
+ * Internally, this may iterate the current thread’s default main context.
+ *
+ * If the stream/component pair doesn’t exist, or if a suitable candidate socket
+ * hasn’t yet been selected for it, a %G_IO_ERROR_BROKEN_PIPE error will be
+ * returned. A %G_IO_ERROR_CANCELLED error will be returned if the operation was
+ * cancelled. %G_IO_ERROR_FAILED will be returned for other errors.
+ *
+ * Returns: the number of bytes written to @buf on success (guaranteed to be
+ * greater than 0 unless @buf_len is 0), or -1 on error
+ *
+ * Since: 0.1.5
+ */
+NICEAPI_EXPORT gssize
+nice_agent_recv (NiceAgent *agent, guint stream_id, guint component_id,
+ guint8 *buf, gsize buf_len, GCancellable *cancellable, GError **error)
+{
+ GMainContext *context;
+ Stream *stream;
+ Component *component;
+ gssize len = -1;
+ GSource *cancellable_source = NULL;
+ gboolean received_enough = FALSE, error_reported = FALSE;
+ GError *child_error = NULL;
+
+ g_return_val_if_fail (NICE_IS_AGENT (agent), -1);
+ g_return_val_if_fail (stream_id >= 1, -1);
+ g_return_val_if_fail (component_id >= 1, -1);
+ g_return_val_if_fail (buf != NULL, -1);
+ g_return_val_if_fail (
+ cancellable == NULL || G_IS_CANCELLABLE (cancellable), -1);
+ g_return_val_if_fail (error == NULL || *error == NULL, -1);
+
+ if (buf_len == 0)
+ return 0;
+
+ agent_lock ();
+
+ /* We’re not going to do the
+ * implement-a-ring-buffer-to-cater-for-tiny-input-buffers game, so just warn
+ * if the buffer size is too small, and silently drop any overspilling
+ * bytes. */
+ g_warn_if_fail (agent->reliable || buf_len >= MAX_BUFFER_SIZE);
+
+ if (!agent_find_component (agent, stream_id, component_id,
+ &stream, &component)) {
+ g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_BROKEN_PIPE,
+ "Invalid stream/component.");
+ goto done;
+ }
+
+ /* Set the component’s receive buffer. */
+ context = component_dup_io_context (component);
+ component_set_io_callback (component, NULL, NULL, buf, buf_len, &child_error);
+
+ /* Add the cancellable as a source. */
+ if (cancellable != NULL) {
+ cancellable_source = g_cancellable_source_new (cancellable);
+ g_source_set_callback (cancellable_source,
+ (GSourceFunc) nice_agent_recv_cancelled_cb, &child_error, NULL);
+ g_source_attach (cancellable_source, context);
+ }
+
+ /* Is there already pending data left over from having an I/O callback
+ * attached and switching to using nice_agent_recv()? This is a horrifically
+ * specific use case which I hope nobody ever tries. And yet, it still must be
+ * supported. */
+ g_mutex_lock (&component->io_mutex);
+
+ while (!received_enough &&
+ !g_queue_is_empty (&component->pending_io_messages)) {
+ IOCallbackData *data;
+ gsize copied_len;
+
+ g_assert (component->io_callback_id == 0);
+
+ data = g_queue_peek_head (&component->pending_io_messages);
+ copied_len = MIN (data->buf_len - data->offset,
+ component->recv_buf_len - component->recv_buf_valid_len);
+
+ memcpy (component->recv_buf + component->recv_buf_valid_len,
+ data->buf + data->offset, len);
+ component->recv_buf_valid_len += copied_len;
+
+ /* If we only managed to grab part of the buffer, leave the buffer in the
+ * queue and have another go at it later. */
+ if (copied_len < data->buf_len - data->offset) {
+ data->offset += copied_len;
+ } else {
+ g_queue_pop_head (&component->pending_io_messages);
+ io_callback_data_free (data);
+ }
+
+ received_enough =
+ ((agent->reliable && component->recv_buf_valid_len >= buf_len) ||
+ (!agent->reliable && component->recv_buf_valid_len > 0));
+ }
+
+ g_mutex_unlock (&component->io_mutex);
+
+ /* Each iteration of the main context will either receive some data, a
+ * cancellation error or a socket error.
+ *
+ * In reliable mode, iterate the loop enough to receive exactly @buf_len
+ * bytes. In non-reliable mode, iterate the loop to receive a single message.
+ */
+ while (!received_enough && !error_reported) {
+ agent_unlock ();
+ g_main_context_iteration (context, TRUE);
+ agent_lock ();
+
+ received_enough =
+ ((agent->reliable && component->recv_buf_valid_len >= buf_len) ||
+ (!agent->reliable && component->recv_buf_valid_len > 0));
+ error_reported = (child_error != NULL);
+ }
+
+ len = component->recv_buf_valid_len;
+ nice_debug ("%s: len: %" G_GSIZE_FORMAT ", buf_len: %" G_GSIZE_FORMAT,
+ G_STRFUNC, len, buf_len);
+
+ /* Tidy up. */
+ if (cancellable_source != NULL) {
+ g_source_destroy (cancellable_source);
+ g_source_unref (cancellable_source);
+ }
+
+ component_set_io_callback (component, NULL, NULL, NULL, 0, NULL);
+ g_main_context_unref (context);
+
+ /* Handle errors and cancellations. */
+ if (!received_enough) {
+ g_assert (error_reported);
+ len = -1;
+ }
+
+done:
+ g_assert ((child_error != NULL) == (len == -1));
+ g_assert (len != 0);
+
+ if (child_error != NULL)
+ g_propagate_error (error, child_error);
+
+ agent_unlock ();
+
+ return len;
+}
NICEAPI_EXPORT gint
nice_agent_send (
@@ -2624,46 +2838,6 @@ nice_agent_dispose (GObject *object)
}
-
-typedef struct _IOCtx IOCtx;
-
-struct _IOCtx
-{
- GSource *source;
- NiceAgent *agent;
- Stream *stream;
- Component *component;
- NiceSocket *socket;
-};
-
-
-static IOCtx *
-io_ctx_new (
- NiceAgent *agent,
- Stream *stream,
- Component *component,
- NiceSocket *socket,
- GSource *source)
-{
- IOCtx *ctx;
-
- ctx = g_slice_new0 (IOCtx);
- ctx->agent = agent;
- ctx->stream = stream;
- ctx->component = component;
- ctx->socket = socket;
- ctx->source = source;
-
- return ctx;
-}
-
-
-static void
-io_ctx_free (IOCtx *ctx)
-{
- g_slice_free (IOCtx, ctx);
-}
-
gboolean
component_io_cb (GSocket *socket, GIOCondition condition, gpointer user_data)
{
@@ -2676,7 +2850,7 @@ component_io_cb (GSocket *socket, GIOCondition condition, gpointer user_data)
guint8 *recv_buf;
gsize recv_buf_len;
gboolean retval = FALSE;
- NiceAgentRecvFunc io_callback;
+ gboolean has_io_callback;
agent_lock ();
@@ -2690,17 +2864,14 @@ component_io_cb (GSocket *socket, GIOCondition condition, gpointer user_data)
goto done;
}
- /* FIXME: Compartmentalise this in component.c */
- g_mutex_lock (&component->io_mutex);
- io_callback = component->io_callback;
- g_mutex_unlock (&component->io_mutex);
+ has_io_callback = component_has_io_callback (component);
/* Choose which receive buffer to use. If we’re reading for
* nice_agent_attach_recv(), use a local static buffer. If we’re reading for
* nice_agent_recv(), use the buffer provided by the client. */
- g_assert (io_callback == NULL || component->recv_buf == NULL);
+ g_assert (!has_io_callback || component->recv_buf == NULL);
- if (io_callback != NULL) {
+ if (has_io_callback) {
recv_buf = local_buf;
recv_buf_len = sizeof (local_buf);
} else if (component->recv_buf != NULL) {
@@ -2739,7 +2910,7 @@ component_io_cb (GSocket *socket, GIOCondition condition, gpointer user_data)
}
/* Actual data to notify the client about. */
- if (io_callback != NULL) {
+ if (has_io_callback) {
component_emit_io_callback (component, recv_buf, len);
} else {
/* Data has been stored in the component’s receive buffer to be picked up
@@ -2784,7 +2955,7 @@ nice_agent_attach_recv (
/* Set the component’s I/O context. */
component_set_io_context (component, ctx);
- component_set_io_callback (component, func, data);
+ component_set_io_callback (component, func, data, NULL, 0, NULL);
ret = TRUE;
if (func) {
diff --git a/agent/agent.h b/agent/agent.h
index c2a39d1..64df422 100644
--- a/agent/agent.h
+++ b/agent/agent.h
@@ -676,6 +676,15 @@ nice_agent_attach_recv (
NiceAgentRecvFunc func,
gpointer data);
+gssize
+nice_agent_recv (
+ NiceAgent *agent,
+ guint stream_id,
+ guint component_id,
+ guint8 *buf,
+ gsize buf_len,
+ GCancellable *cancellable,
+ GError **error);
/**
* nice_agent_set_selected_pair:
diff --git a/agent/component.c b/agent/component.c
index 62ecd4f..1bf6c33 100644
--- a/agent/component.c
+++ b/agent/component.c
@@ -61,7 +61,7 @@ component_deschedule_io_callback (Component *component);
/* Must *not* take the agent lock, since it’s called from within
- * component_set_io_callback(), which holds the Component’s I/O lock. */
+ * component_set_io_context(), which holds the Component’s I/O lock. */
static void
socket_source_attach (SocketSource *socket_source, GMainContext *context)
{
@@ -129,7 +129,7 @@ component_new (guint id, NiceAgent *agent, Stream *stream)
* will be updated when nice_agent_attach_recv() or nice_agent_recv() are
* called. */
component_set_io_context (component, NULL);
- component_set_io_callback (component, NULL, NULL);
+ component_set_io_callback (component, NULL, NULL, NULL, 0, NULL);
return component;
}
@@ -438,7 +438,7 @@ component_attach_socket (Component *component, NiceSocket *socket)
/* Reattaches socket handles of @component to the main context.
*
* Must *not* take the agent lock, since it’s called from within
- * component_set_io_callback(), which holds the Component’s I/O lock. */
+ * component_set_io_context(), which holds the Component’s I/O lock. */
static void
component_reattach_all_sockets (Component *component)
{
@@ -486,7 +486,7 @@ component_detach_socket (Component *component, NiceSocket *socket)
* sockets themselves untouched.
*
* Must *not* take the agent lock, since it’s called from within
- * component_set_io_callback(), which holds the Component’s I/O lock.
+ * component_set_io_context(), which holds the Component’s I/O lock.
*/
void
component_detach_all_sockets (Component *component)
@@ -511,6 +511,12 @@ component_free_socket_sources (Component *component)
component->socket_sources = NULL;
}
+GMainContext *
+component_dup_io_context (Component *component)
+{
+ return g_main_context_ref (component->ctx);
+}
+
/* If @context is %NULL, a fresh context is used, so component->ctx is always
* guaranteed to be non-%NULL. */
void
@@ -534,24 +540,40 @@ component_set_io_context (Component *component, GMainContext *context)
g_mutex_unlock (&component->io_mutex);
}
+/* (func, user_data) and (recv_buf, recv_buf_len) are mutually exclusive.
+ * At most one of the two must be specified; if both are NULL, the Component
+ * will not receive any data (i.e. reception is paused). */
void
component_set_io_callback (Component *component,
- NiceAgentRecvFunc func, gpointer user_data)
+ NiceAgentRecvFunc func, gpointer user_data,
+ guint8 *recv_buf, gsize recv_buf_len,
+ GError **error)
{
+ g_assert (func == NULL || recv_buf == NULL);
+ g_assert (recv_buf != NULL || recv_buf_len == 0);
+ g_assert (error == NULL || *error == NULL);
+
g_mutex_lock (&component->io_mutex);
if (func != NULL) {
component->io_callback = func;
component->io_user_data = user_data;
+ component->recv_buf = NULL;
+ component->recv_buf_len = 0;
component_schedule_io_callback (component);
} else {
component->io_callback = NULL;
component->io_user_data = NULL;
+ component->recv_buf = recv_buf;
+ component->recv_buf_len = recv_buf_len;
component_deschedule_io_callback (component);
}
+ component->recv_buf_valid_len = 0;
+ component->recv_buf_error = error;
+
g_mutex_unlock (&component->io_mutex);
}
diff --git a/agent/component.h b/agent/component.h
index 5186bc1..9879b46 100644
--- a/agent/component.h
+++ b/agent/component.h
@@ -150,7 +150,10 @@ struct _Component
/* I/O handling. The main context must always be non-NULL, and is used for all
* socket recv() operations. All io_callback emissions are invoked in this
- * context too. */
+ * context too.
+ *
+ * recv_buf and io_callback are mutually exclusive, but it is allowed for both
+ * to be NULL if the Component is not currently ready to receive data. */
GMutex io_mutex; /**< protects io_callback, io_user_data,
pending_io_messages and io_callback_id.
immutable: can be accessed without
@@ -159,8 +162,6 @@ struct _Component
taken before this one */
NiceAgentRecvFunc io_callback; /**< function called on io cb */
gpointer io_user_data; /**< data passed to the io function */
- GMainContext *ctx; /**< context for GSources for this
- component */
GQueue pending_io_messages; /**< queue of packets which have been
received but not passed to the client
in an I/O callback or recv() call yet.
@@ -168,6 +169,13 @@ struct _Component
IOCallbackData */
guint io_callback_id; /* GSource ID of the I/O callback */
+ GMainContext *ctx; /**< context for GSources for this
+ component */
+ guint8 *recv_buf; /**< unowned buffer for receiving into */
+ gsize recv_buf_len; /**< allocated size of recv_buf in bytes */
+ gsize recv_buf_valid_len; /**< length of valid data in recv_buf */
+ GError **recv_buf_error; /**< error information about failed reads */
+
NiceAgent *agent; /* unowned, immutable: can be accessed without holding the
* agent lock */
Stream *stream; /* unowned, immutable: can be accessed without holding the
@@ -212,11 +220,15 @@ component_detach_all_sockets (Component *component);
void
component_free_socket_sources (Component *component);
+GMainContext *
+component_dup_io_context (Component *component);
void
component_set_io_context (Component *component, GMainContext *context);
void
component_set_io_callback (Component *component,
- NiceAgentRecvFunc func, gpointer user_data);
+ NiceAgentRecvFunc func, gpointer user_data,
+ guint8 *recv_buf, gsize recv_buf_len,
+ GError **error);
void
component_emit_io_callback (Component *component,
const guint8 *buf, gsize buf_len);