summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilip Withnall <philip.withnall@collabora.co.uk>2014-01-16 16:06:30 +0000
committerOlivier Crête <olivier.crete@collabora.com>2014-01-31 01:49:07 -0500
commit253be34806bee232df55f2a92609f74a015797da (patch)
treed87125595c38a584ee7a422d9290245a30dadaab
parent9661150dcda7939d108e583e7681352f99e581c8 (diff)
downloadlibnice-253be34806bee232df55f2a92609f74a015797da.tar.gz
agent: Add support for vectored I/O for receives
Add two new public functions: • nice_agent_recv_messages() • nice_agent_recv_messages_nonblocking() which allow receiving multiple messages in a single call, and support vectors of buffers to receive the messages into. The existing nice_agent_recv[_nonblocking]() APIs have been left untouched. This tidies up a lot of the message handling code internally, and eliminates a couple of memcpy()s. There are still a few more memcpy()s on the critical path, which could be eliminated with further work. In the reliable agent case, every message is memcpy()ed twice: once into the pseudo-TCP receive buffer, and once out of it. The copy on input could be eliminated (in the case of in-order delivery of packets) by receiving directly into the receive buffer. The copy on output can’t be eliminated except in the I/O callback case (when nice_agent_attach_recv() has been used), in which case the callback could be invoked with a pointer directly into the pseudo-TCP receive buffer. In the non-reliable agent case, zero memcpy()s are used. A couple of the more complex socket implementations (TURN and HTTP) have slow paths during setup, and partially also during normal use. These could be optimised further, and FIXME comments have been added.
-rw-r--r--agent/agent-priv.h36
-rw-r--r--agent/agent.c921
-rw-r--r--agent/agent.h34
-rw-r--r--agent/component.c26
-rw-r--r--agent/component.h18
-rw-r--r--agent/pseudotcp.c68
-rw-r--r--agent/pseudotcp.h18
-rw-r--r--docs/reference/libnice/libnice-sections.txt2
-rw-r--r--nice/libnice.sym2
-rw-r--r--socket/turn.c39
-rw-r--r--socket/turn.h4
11 files changed, 853 insertions, 315 deletions
diff --git a/agent/agent-priv.h b/agent/agent-priv.h
index b8095da..4e10c4f 100644
--- a/agent/agent-priv.h
+++ b/agent/agent-priv.h
@@ -52,11 +52,44 @@
#include <glib.h>
#include "agent.h"
+
+/**
+ * NiceInputMessageIter:
+ * @message: index of the message currently being written into
+ * @buffer: index of the buffer currently being written into
+ * @offset: byte offset into the buffer
+ *
+ * Iterator for sequentially writing into an array of #NiceInputMessages,
+ * tracking the current write position (i.e. the index of the next byte to be
+ * written).
+ *
+ * If @message is equal to the number of messages in the associated
+ * #NiceInputMessage array, and @buffer and @offset are zero, the iterator is at
+ * the end of the messages array, and the array is (presumably) full.
+ *
+ * Since: 0.1.5
+ */
+typedef struct {
+ guint message;
+ guint buffer;
+ gsize offset;
+} NiceInputMessageIter;
+
+void
+nice_input_message_iter_reset (NiceInputMessageIter *iter);
+gboolean
+nice_input_message_iter_is_at_end (NiceInputMessageIter *iter,
+ NiceInputMessage *messages, guint n_messages);
+guint
+nice_input_message_iter_get_n_valid_messages (NiceInputMessageIter *iter);
+
+
#include "socket.h"
#include "candidate.h"
#include "stream.h"
#include "conncheck.h"
#include "component.h"
+#include "random.h"
#include "stun/stunagent.h"
#include "stun/usages/turn.h"
#include "stun/usages/ice.h"
@@ -178,9 +211,6 @@ component_io_cb (
GIOCondition condition,
gpointer data);
-gssize agent_recv_locked (NiceAgent *agent, Stream *stream,
- Component *component, NiceSocket *socket, guint8 *buf, gsize buf_len);
-
gsize
memcpy_buffer_to_input_message (NiceInputMessage *message,
const guint8 *buffer, gsize buffer_length);
diff --git a/agent/agent.c b/agent/agent.c
index b4e38cb..695767e 100644
--- a/agent/agent.c
+++ b/agent/agent.c
@@ -83,6 +83,9 @@
#define MAX_TCP_MTU 1400 /* Use 1400 because of VPNs and we assume IEE 802.3 */
+static void
+nice_debug_message_composition (NiceInputMessage *messages, guint n_messages);
+
G_DEFINE_TYPE (NiceAgent, nice_agent, G_TYPE_OBJECT);
enum
@@ -1025,6 +1028,73 @@ pseudo_tcp_socket_opened (PseudoTcpSocket *sock, gpointer user_data)
stream->id, component->id);
}
+/* Will fill up @messages from the first free byte onwards (as determined using
+ * @iter). This is always used in reliable mode, so it essentially treats
+ * @messages as a massive flat array of buffers.
+ *
+ * Updates @iter in place. @iter and @messages are left in invalid states if
+ * an error is returned.
+ *
+ * Returns the number of valid messages in @messages on success (which may be
+ * zero if reading into the first buffer of the message would have blocked), or
+ * a negative number on error. */
+static gint
+pseudo_tcp_socket_recv_messages (PseudoTcpSocket *self,
+ NiceInputMessage *messages, guint n_messages, NiceInputMessageIter *iter,
+ GError **error)
+{
+ for (; iter->message < n_messages; iter->message++) {
+ NiceInputMessage *message = &messages[iter->message];
+
+ if (iter->buffer == 0 && iter->offset == 0) {
+ message->length = 0;
+ }
+
+ for (;
+ (message->n_buffers >= 0 && iter->buffer < (guint) message->n_buffers) ||
+ (message->n_buffers < 0 && message->buffers[iter->buffer].buffer != NULL);
+ iter->buffer++) {
+ GInputVector *buffer = &message->buffers[iter->buffer];
+
+ do {
+ gssize len;
+
+ len = pseudo_tcp_socket_recv (self,
+ (gchar *) buffer->buffer + iter->offset,
+ buffer->size - iter->offset);
+
+ nice_debug ("%s: Received %" G_GSSIZE_FORMAT " bytes into "
+ "buffer %p (offset %" G_GSIZE_FORMAT ", length %" G_GSIZE_FORMAT
+ ").", G_STRFUNC, len, buffer->buffer, iter->offset, buffer->size);
+
+ if (len < 0 && pseudo_tcp_socket_get_error (self) == EWOULDBLOCK) {
+ len = 0;
+ goto done;
+ } else if (len < 0 && pseudo_tcp_socket_get_error (self) == ENOTCONN) {
+ g_set_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+ "Error reading data from pseudo-TCP socket: not connected.");
+ return len;
+ } else if (len < 0) {
+ g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
+ "Error reading data from pseudo-TCP socket.");
+ return len;
+ } else {
+ /* Got some data! */
+ message->length += len;
+ iter->offset += len;
+ }
+ } while (iter->offset < buffer->size);
+
+ iter->offset = 0;
+ }
+
+ iter->buffer = 0;
+ }
+
+done:
+ return nice_input_message_iter_get_n_valid_messages (iter);
+}
+
/* This is called with the agent lock held. */
static void
pseudo_tcp_socket_readable (PseudoTcpSocket *sock, gpointer user_data)
@@ -1032,8 +1102,6 @@ pseudo_tcp_socket_readable (PseudoTcpSocket *sock, gpointer user_data)
Component *component = user_data;
NiceAgent *agent = component->agent;
Stream *stream = component->stream;
- guint8 buf[MAX_BUFFER_SIZE];
- gssize len;
gboolean has_io_callback;
nice_debug ("Agent %p: s%d:%d pseudo Tcp socket readable", agent,
@@ -1045,59 +1113,81 @@ pseudo_tcp_socket_readable (PseudoTcpSocket *sock, gpointer user_data)
g_object_add_weak_pointer (G_OBJECT (agent), (gpointer *)&agent);
has_io_callback = component_has_io_callback (component);
- do {
- /* Only dequeue pseudo-TCP data if we can reliably inform the client. The
- * agent lock is held here, so has_io_callback can only change during
- * component_emit_io_callback(), after which it’s re-queried. This ensures
- * no data loss of packets already received and dequeued. */
- if (has_io_callback) {
+ /* Only dequeue pseudo-TCP data if we can reliably inform the client. The
+ * agent lock is held here, so has_io_callback can only change during
+ * component_emit_io_callback(), after which it’s re-queried. This ensures
+ * no data loss of packets already received and dequeued. */
+ if (has_io_callback) {
+ do {
+ guint8 buf[MAX_BUFFER_SIZE];
+ gssize len;
+
+ /* FIXME: Why copy into a temporary buffer here? Why can’t the I/O
+ * callbacks be emitted directly from the pseudo-TCP receive buffer? */
len = pseudo_tcp_socket_recv (sock, (gchar *) buf, sizeof(buf));
- } else if (component->recv_buf != NULL) {
- len = pseudo_tcp_socket_recv (sock,
- (gchar *) component->recv_buf + component->recv_buf_valid_len,
- component->recv_buf_len - component->recv_buf_valid_len);
- } else {
- len = 0;
- }
- nice_debug ("%s: received %" G_GSSIZE_FORMAT " bytes", G_STRFUNC, len);
+ nice_debug ("%s: I/O callback case: Received %" G_GSSIZE_FORMAT " bytes",
+ G_STRFUNC, len);
+
+ if (len == 0) {
+ component->tcp_readable = FALSE;
+ break;
+ } else if (len <= 0) {
+ /* Handle errors. */
+ if (pseudo_tcp_socket_get_error (sock) != EWOULDBLOCK) {
+ nice_debug ("%s: calling priv_pseudo_tcp_error()", G_STRFUNC);
+ priv_pseudo_tcp_error (agent, stream, component);
+
+ if (component->recv_buf_error != NULL) {
+ GIOErrorEnum error_code;
+
+ if (pseudo_tcp_socket_get_error (sock) == ENOTCONN)
+ error_code = G_IO_ERROR_BROKEN_PIPE;
+ else
+ error_code = G_IO_ERROR_FAILED;
+
+ g_set_error (component->recv_buf_error, G_IO_ERROR, error_code,
+ "Error reading data from pseudo-TCP socket.");
+ }
+ }
+
+ break;
+ }
- if (len > 0 && has_io_callback) {
component_emit_io_callback (component, buf, len);
+
if (sock == NULL) {
nice_debug ("PseudoTCP socket got destroyed in readable callback!");
break;
}
- } else if (len > 0 && component->recv_buf != NULL) {
- /* No callback to call. The data has been copied directly into the
- * client’s receive buffer. */
- component->recv_buf_valid_len += len;
- } else if (len < 0 &&
- pseudo_tcp_socket_get_error (sock) != EWOULDBLOCK) {
- /* Signal error */
+
+ has_io_callback = component_has_io_callback (component);
+ } while (has_io_callback);
+ } else if (component->recv_messages != NULL) {
+ gint n_valid_messages;
+
+ /* Fill up every buffer in every message until the connection closes or an
+ * error occurs. Copy the data directly into the client’s receive message
+ * array without making any callbacks. Update component->recv_messages_iter
+ * as we go. */
+ n_valid_messages = pseudo_tcp_socket_recv_messages (sock,
+ component->recv_messages, component->n_recv_messages,
+ &component->recv_messages_iter, component->recv_buf_error);
+
+ nice_debug ("%s: Client buffers case: Received %d valid messages:",
+ G_STRFUNC, n_valid_messages);
+ nice_debug_message_composition (component->recv_messages,
+ component->n_recv_messages);
+
+ if (n_valid_messages < 0) {
nice_debug ("%s: calling priv_pseudo_tcp_error()", G_STRFUNC);
priv_pseudo_tcp_error (agent, stream, component);
-
- if (component->recv_buf != NULL) {
- GIOErrorEnum error_code;
-
- if (pseudo_tcp_socket_get_error (sock) == ENOTCONN)
- error_code = G_IO_ERROR_BROKEN_PIPE;
- else
- error_code = G_IO_ERROR_FAILED;
-
- g_set_error (component->recv_buf_error, G_IO_ERROR, error_code,
- "Error reading data from pseudo-TCP socket.");
- }
- } else if (len < 0 &&
- pseudo_tcp_socket_get_error (sock) == EWOULDBLOCK){
+ } else if (n_valid_messages == 0) {
component->tcp_readable = FALSE;
}
-
- has_io_callback = component_has_io_callback (component);
- } while (len > 0 &&
- (has_io_callback ||
- component->recv_buf_valid_len < component->recv_buf_len));
+ } else {
+ nice_debug ("%s: no data read", G_STRFUNC);
+ }
if (agent) {
adjust_tcp_clock (agent, stream, component);
@@ -2405,68 +2495,76 @@ nice_agent_set_remote_candidates (NiceAgent *agent, guint stream_id, guint compo
return added;
}
+/* Return values for agent_recv_message_unlocked(). Needed purely because it
+ * must differentiate between RECV_OOB and RECV_SUCCESS. */
+typedef enum {
+ RECV_ERROR = -2,
+ RECV_WOULD_BLOCK = -1,
+ RECV_OOB = 0,
+ RECV_SUCCESS = 1,
+} RecvStatus;
+
/*
- * agent_recv_locked:
+ * agent_recv_message_unlocked:
* @agent: a #NiceAgent
* @stream: the stream to receive from
* @component: the component to receive from
* @socket: the socket to receive on
- * @buf: the buffer to write into (must be at least @buf_len bytes long)
- * @buf_len: the length of @buf
+ * @message: the message to write into (must have at least 65536 bytes of buffer
+ * space)
*
- * Receive up to @buf_len bytes of data from the given
- * @stream/@component/@socket, in a non-blocking fashion. If the socket is a
- * datagram socket and @buf_len is not big enough to hold an entire packet, the
- * remaining bytes of the packet will be silently dropped.
+ * Receive a single message of data from the given @stream, @component and
+ * @socket tuple, in a non-blocking fashion. The caller must ensure that
+ * @message contains enough buffers to provide at least 65536 bytes of buffer
+ * space, but the buffers may be split as the caller sees fit.
*
- * NOTE: Must be called with the agent’s lock held.
+ * This must be called with the agent’s lock held.
*
- * Returns: number of bytes stored in @buf, 0 if no data is available, or -1 on
- * error
+ * Returns: number of valid messages received on success (i.e. %RECV_SUCCESS or
+ * 1), %RECV_OOB if data was successfully received but was handled out-of-band
+ * (e.g. due to being a STUN control packet), %RECV_WOULD_BLOCK if no data is
+ * available and the call would block, or %RECV_ERROR on error
*/
-gssize
-agent_recv_locked (
+static RecvStatus
+agent_recv_message_unlocked (
NiceAgent *agent,
Stream *stream,
Component *component,
NiceSocket *socket,
- guint8 *buf,
- gsize buf_len)
+ NiceInputMessage *message)
{
NiceAddress from;
- gssize len;
GList *item;
- guint8 local_buf[MAX_BUFFER_SIZE];
- gsize local_buf_len = MAX_BUFFER_SIZE;
- GInputVector local_bufs = { local_buf, local_buf_len };
- NiceInputMessage local_messages = { &local_bufs, 1, &from, 0 };
- gint n_valid_messages;
+ gint retval;
- /* Returns -1 on error, 0 on EWOULDBLOCK, and > 0 on success.
- *
- * FIXME: We have to receive into a local buffer then copy out because
- * otherwise, if @buf is too small, we could lose data, even when in
- * reliable mode (because reliable streams are packetised). */
- n_valid_messages = nice_socket_recv_messages (socket, &local_messages, 1);
+ /* We need an address for packet parsing, below. */
+ if (message->from == NULL) {
+ message->from = &from;
+ }
- len = (n_valid_messages == 1) ?
- (gssize) local_messages.length : n_valid_messages;
+ retval = nice_socket_recv_messages (socket, message, 1);
- if (len == 0) {
- return 0;
- } else if (len < 0) {
- nice_debug ("Agent %p: %s returned %" G_GSSIZE_FORMAT ", errno (%d) : %s",
- agent, G_STRFUNC, len, errno, g_strerror (errno));
+ nice_debug ("%s: Received %d valid messages of length %" G_GSIZE_FORMAT
+ " from base socket %p.", G_STRFUNC, retval, message->length, socket);
+
+ if (retval == 0) {
+ retval = RECV_WOULD_BLOCK; /* EWOULDBLOCK */
+ goto done;
+ } else if (retval < 0) {
+ nice_debug ("Agent %p: %s returned %d, errno (%d) : %s",
+ agent, G_STRFUNC, retval, errno, g_strerror (errno));
- return len;
+ retval = RECV_ERROR;
+ goto done;
}
#ifndef NDEBUG
- if (len > 0) {
+ if (message->length > 0) {
gchar tmpbuf[INET6_ADDRSTRLEN];
- nice_address_to_string (&from, tmpbuf);
+ nice_address_to_string (message->from, tmpbuf);
nice_debug ("Agent %p : Packet received on local socket %d from [%s]:%u (%" G_GSSIZE_FORMAT " octets).", agent,
- g_socket_get_fd (socket->fileno), tmpbuf, nice_address_get_port (&from), len);
+ g_socket_get_fd (socket->fileno), tmpbuf,
+ nice_address_get_port (message->from), message->length);
}
#endif
@@ -2474,7 +2572,7 @@ agent_recv_locked (
TurnServer *turn = item->data;
GSList *i = NULL;
- if (!nice_address_equal (&from, &turn->server))
+ if (!nice_address_equal (message->from, &turn->server))
continue;
#ifndef NDEBUG
@@ -2488,8 +2586,7 @@ agent_recv_locked (
if (cand->type == NICE_CANDIDATE_TYPE_RELAYED &&
cand->stream_id == stream->id &&
cand->component_id == component->id) {
- len = nice_turn_socket_parse_recv (cand->sockptr, &socket,
- &from, len, (gchar *) local_buf, &from, (gchar *) local_buf, len);
+ nice_turn_socket_parse_recv_message (cand->sockptr, &socket, message);
}
}
}
@@ -2498,17 +2595,39 @@ agent_recv_locked (
/* If the message’s stated length is equal to its actual length, it’s probably
* a STUN message; otherwise it’s probably data. */
- if (stun_message_validate_buffer_length ((uint8_t *) local_buf, (size_t) len,
+ if (stun_message_validate_buffer_length_fast (
+ (StunInputVector *) message->buffers, message->n_buffers, message->length,
(agent->compatibility != NICE_COMPATIBILITY_OC2007 &&
- agent->compatibility != NICE_COMPATIBILITY_OC2007R2)) == len &&
- conn_check_handle_inbound_stun (agent, stream, component, socket,
- &from, (gchar *) local_buf, len)) {
- /* Handled STUN message. */
- return 0;
+ agent->compatibility != NICE_COMPATIBILITY_OC2007R2)) == (ssize_t) message->length) {
+ /* Slow path: If this message isn’t obviously *not* a STUN packet, compact
+ * its buffers
+ * into a single monolithic one and parse the packet properly. */
+ guint8 *big_buf;
+ gsize big_buf_len;
+
+ big_buf = compact_input_message (message, &big_buf_len);
+
+ if (stun_message_validate_buffer_length (big_buf, big_buf_len,
+ (agent->compatibility != NICE_COMPATIBILITY_OC2007 &&
+ agent->compatibility != NICE_COMPATIBILITY_OC2007R2)) == (gint) big_buf_len &&
+ conn_check_handle_inbound_stun (agent, stream, component, socket,
+ message->from, (gchar *) big_buf, big_buf_len)) {
+ /* Handled STUN message. */
+ nice_debug ("%s: Valid STUN packet received.", G_STRFUNC);
+
+ retval = RECV_OOB;
+ g_free (big_buf);
+ goto done;
+ }
+
+ nice_debug ("%s: WARNING: Packet passed fast STUN validation but failed "
+ "slow validation.", G_STRFUNC);
+
+ g_free (big_buf);
}
/* Unhandled STUN; try handling TCP data, then pass to the client. */
- if (len > 0 && component->tcp) {
+ if (message->length > 0 && component->tcp) {
/* If we don’t yet have an underlying selected socket, queue up the incoming
* data to handle later. This is because we can’t send ACKs (or, more
* importantly for the first few packets, SYNACKs) without an underlying
@@ -2518,11 +2637,10 @@ agent_recv_locked (
* machine. */
if (component->selected_pair.local == NULL) {
GOutputVector *vec = g_slice_new (GOutputVector);
- vec->buffer = g_memdup (local_buf, len);
- vec->size = len;
+ vec->buffer = compact_input_message (message, &vec->size);
g_queue_push_tail (&component->queued_tcp_packets, vec);
nice_debug ("%s: Queued %" G_GSSIZE_FORMAT " bytes for agent %p.",
- G_STRFUNC, len, agent);
+ G_STRFUNC, vec->size, agent);
return 0;
} else {
@@ -2532,9 +2650,9 @@ agent_recv_locked (
/* Received data on a reliable connection. */
g_object_add_weak_pointer (G_OBJECT (agent), (gpointer *) &agent);
- nice_debug ("%s: notifying pseudo-TCP of packet, length %" G_GSSIZE_FORMAT,
- G_STRFUNC, len);
- pseudo_tcp_socket_notify_packet (component->tcp, (gchar *) local_buf, len);
+ nice_debug ("%s: notifying pseudo-TCP of packet, length %" G_GSIZE_FORMAT,
+ G_STRFUNC, message->length);
+ pseudo_tcp_socket_notify_message (component->tcp, message);
if (agent) {
adjust_tcp_clock (agent, stream, component);
@@ -2543,22 +2661,24 @@ agent_recv_locked (
nice_debug ("Our agent got destroyed in notify_packet!!");
}
- /* Data’s already been handled, so return 0. */
- return 0;
- } else if (len > 0 && !component->tcp && agent->reliable) {
+ /* Success! Handled out-of-band. */
+ retval = RECV_OOB;
+ goto done;
+ } else if (message->length > 0 && !component->tcp && agent->reliable) {
/* Received data on a reliable connection which has no TCP component. */
nice_debug ("Received data on a pseudo tcp FAILED component. Ignoring.");
- return 0;
+ retval = RECV_OOB;
+ goto done;
}
- /* Yay for poor performance! */
- if (len >= 0) {
- len = MIN (buf_len, (gsize) len);
- memcpy (buf, local_buf, len);
+done:
+ /* Clear local modifications. */
+ if (message->from == &from) {
+ message->from = NULL;
}
- return len;
+ return retval;
}
/* Print the composition of an array of messages. No-op if debugging is
@@ -2661,6 +2781,137 @@ memcpy_buffer_to_input_message (NiceInputMessage *message,
return message->length;
}
+/**
+ * nice_input_message_iter_reset:
+ * @iter: a #NiceInputMessageIter
+ *
+ * Reset the given @iter to point to the beginning of the array of messages.
+ * This may be used both to initialise it and to reset it after use.
+ *
+ * Since: 0.1.5
+ */
+void
+nice_input_message_iter_reset (NiceInputMessageIter *iter)
+{
+ iter->message = 0;
+ iter->buffer = 0;
+ iter->offset = 0;
+}
+
+/**
+ * nice_input_message_iter_is_at_end:
+ * @iter: a #NiceInputMessageIter
+ * @messages: (array length=n_messages): an array of #NiceInputMessages
+ * @n_messages: number of entries in @messages
+ *
+ * Determine whether @iter points to the end of the given @messages array. If it
+ * does, the array is full: every buffer in every message is full of valid
+ * bytes.
+ *
+ * Returns: %TRUE if the messages’ buffers are full, %FALSE otherwise
+ *
+ * Since: 0.1.5
+ */
+gboolean
+nice_input_message_iter_is_at_end (NiceInputMessageIter *iter,
+ NiceInputMessage *messages, guint n_messages)
+{
+ return (iter->message == n_messages &&
+ iter->buffer == 0 && iter->offset == 0);
+}
+
+/**
+ * nice_input_message_iter_get_n_valid_messages:
+ * @iter: a #NiceInputMessageIter
+ *
+ * Calculate the number of valid messages in the messages array. A valid message
+ * is one which contains at least one valid byte of data in its buffers.
+ *
+ * Returns: number of valid messages (may be zero)
+ *
+ * Since: 0.1.5
+ */
+guint
+nice_input_message_iter_get_n_valid_messages (NiceInputMessageIter *iter)
+{
+ if (iter->buffer == 0 && iter->offset == 0)
+ return iter->message;
+ else
+ return iter->message + 1;
+}
+
+/* Will fill up @messages from the first free byte onwards (as determined using
+ * @iter). This may be used in reliable or non-reliable mode; in non-reliable
+ * mode it will always increment the message index after each buffer is
+ * consumed.
+ *
+ * Updates @iter in place. No errors can occur.
+ *
+ * Returns the number of valid messages in @messages on success (which may be
+ * zero if reading into the first buffer of the message would have blocked).
+ *
+ * Must be called with the io_mutex held. */
+static gint
+pending_io_messages_recv_messages (Component *component, gboolean reliable,
+ NiceInputMessage *messages, guint n_messages, NiceInputMessageIter *iter)
+{
+ gsize len;
+ IOCallbackData *data;
+ NiceInputMessage *message = &messages[iter->message];
+
+ g_assert (component->io_callback_id == 0);
+
+ data = g_queue_peek_head (&component->pending_io_messages);
+ if (data == NULL)
+ goto done;
+
+ if (iter->buffer == 0 && iter->offset == 0) {
+ message->length = 0;
+ }
+
+ for (;
+ (message->n_buffers >= 0 && iter->buffer < (guint) message->n_buffers) ||
+ (message->n_buffers < 0 && message->buffers[iter->buffer].buffer != NULL);
+ iter->buffer++) {
+ GInputVector *buffer = &message->buffers[iter->buffer];
+
+ do {
+ len = MIN (data->buf_len - data->offset, buffer->size - iter->offset);
+ memcpy ((guint8 *) buffer->buffer + iter->offset,
+ data->buf + data->offset, len);
+
+ nice_debug ("%s: Unbuffered %" G_GSIZE_FORMAT " bytes into "
+ "buffer %p (offset %" G_GSIZE_FORMAT ", length %" G_GSIZE_FORMAT
+ ").", G_STRFUNC, len, buffer->buffer, iter->offset, buffer->size);
+
+ message->length += len;
+ iter->offset += len;
+ data->offset += len;
+ } while (iter->offset < buffer->size);
+
+ iter->offset = 0;
+ }
+
+ /* Only if we managed to consume the whole buffer should it be popped off the
+ * queue; otherwise we’ll have another go at it later. */
+ if (data->offset == data->buf_len) {
+ g_queue_pop_head (&component->pending_io_messages);
+ io_callback_data_free (data);
+
+ /* If we’ve consumed an entire message from pending_io_messages, and
+ * are in non-reliable mode, move on to the next message in
+ * @messages. */
+ if (!reliable) {
+ iter->offset = 0;
+ iter->buffer = 0;
+ iter->message++;
+ }
+ }
+
+done:
+ return nice_input_message_iter_get_n_valid_messages (iter);
+}
+
static gboolean
nice_agent_recv_cancelled_cb (GCancellable *cancellable, gpointer user_data)
{
@@ -2668,15 +2919,16 @@ nice_agent_recv_cancelled_cb (GCancellable *cancellable, gpointer user_data)
return !g_cancellable_set_error_if_cancelled (cancellable, error);
}
-static gssize
-nice_agent_recv_blocking_or_nonblocking (NiceAgent *agent, guint stream_id,
- guint component_id, gboolean blocking, guint8 *buf, gsize buf_len,
+static gint
+nice_agent_recv_messages_blocking_or_nonblocking (NiceAgent *agent,
+ guint stream_id, guint component_id, gboolean blocking,
+ NiceInputMessage *messages, guint n_messages,
GCancellable *cancellable, GError **error)
{
GMainContext *context;
Stream *stream;
Component *component;
- gssize len = -1;
+ gint n_valid_messages = -1;
GSource *cancellable_source = NULL;
gboolean received_enough = FALSE, error_reported = FALSE;
gboolean all_sockets_would_block = FALSE;
@@ -2685,22 +2937,16 @@ nice_agent_recv_blocking_or_nonblocking (NiceAgent *agent, guint stream_id,
g_return_val_if_fail (NICE_IS_AGENT (agent), -1);
g_return_val_if_fail (stream_id >= 1, -1);
g_return_val_if_fail (component_id >= 1, -1);
- g_return_val_if_fail (buf != NULL, -1);
+ g_return_val_if_fail (n_messages == 0 || messages != NULL, -1);
g_return_val_if_fail (
cancellable == NULL || G_IS_CANCELLABLE (cancellable), -1);
g_return_val_if_fail (error == NULL || *error == NULL, -1);
- if (buf_len == 0)
+ if (n_messages == 0)
return 0;
agent_lock ();
- /* We’re not going to do the
- * implement-a-ring-buffer-to-cater-for-tiny-input-buffers game, so just warn
- * if the buffer size is too small, and silently drop any overspilling
- * bytes. */
- g_warn_if_fail (agent->reliable || buf_len >= MAX_BUFFER_SIZE);
-
if (!agent_find_component (agent, stream_id, component_id,
&stream, &component)) {
g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_BROKEN_PIPE,
@@ -2708,9 +2954,13 @@ nice_agent_recv_blocking_or_nonblocking (NiceAgent *agent, guint stream_id,
goto done;
}
+ nice_debug ("%s: (%s):", G_STRFUNC, blocking ? "blocking" : "non-blocking");
+ nice_debug_message_composition (messages, n_messages);
+
/* Set the component’s receive buffer. */
context = component_dup_io_context (component);
- component_set_io_callback (component, NULL, NULL, buf, buf_len, &child_error);
+ component_set_io_callback (component, NULL, NULL, messages, n_messages,
+ &child_error);
/* Add the cancellable as a source. */
if (cancellable != NULL) {
@@ -2728,31 +2978,13 @@ nice_agent_recv_blocking_or_nonblocking (NiceAgent *agent, guint stream_id,
while (!received_enough &&
!g_queue_is_empty (&component->pending_io_messages)) {
- IOCallbackData *data;
- gsize copied_len;
-
- g_assert (component->io_callback_id == 0);
-
- data = g_queue_peek_head (&component->pending_io_messages);
- copied_len = MIN (data->buf_len - data->offset,
- component->recv_buf_len - component->recv_buf_valid_len);
-
- memcpy (component->recv_buf + component->recv_buf_valid_len,
- data->buf + data->offset, len);
- component->recv_buf_valid_len += copied_len;
-
- /* If we only managed to grab part of the buffer, leave the buffer in the
- * queue and have another go at it later. */
- if (copied_len < data->buf_len - data->offset) {
- data->offset += copied_len;
- } else {
- g_queue_pop_head (&component->pending_io_messages);
- io_callback_data_free (data);
- }
+ pending_io_messages_recv_messages (component, agent->reliable,
+ component->recv_messages, component->n_recv_messages,
+ &component->recv_messages_iter);
received_enough =
- ((agent->reliable && component->recv_buf_valid_len >= buf_len) ||
- (!agent->reliable && component->recv_buf_valid_len > 0));
+ nice_input_message_iter_is_at_end (&component->recv_messages_iter,
+ component->recv_messages, component->n_recv_messages);
}
g_mutex_unlock (&component->io_mutex);
@@ -2761,58 +2993,52 @@ nice_agent_recv_blocking_or_nonblocking (NiceAgent *agent, guint stream_id,
* before trying the sockets. */
if (agent->reliable && component->tcp != NULL &&
pseudo_tcp_socket_get_available_bytes (component->tcp) > 0) {
- len = pseudo_tcp_socket_recv (component->tcp, (gchar *) component->recv_buf,
- component->recv_buf_len);
+ pseudo_tcp_socket_recv_messages (component->tcp,
+ component->recv_messages, component->n_recv_messages,
+ &component->recv_messages_iter, &child_error);
adjust_tcp_clock (agent, stream, component);
- nice_debug ("%s: Received %" G_GSSIZE_FORMAT " bytes from pseudo-TCP read "
- "buffer.", G_STRFUNC, len);
+ nice_debug ("%s: Received %d valid messages from pseudo-TCP read buffer.",
+ G_STRFUNC, n_valid_messages);
- if (len < 0 &&
- pseudo_tcp_socket_get_error (component->tcp) == EWOULDBLOCK) {
- len = 0;
- } else if (len < 0 &&
- pseudo_tcp_socket_get_error (component->tcp) == ENOTCONN) {
- g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
- "Error reading data from pseudo-TCP socket: not connected.");
- } else if (len < 0) {
- g_set_error (&child_error, G_IO_ERROR, G_IO_ERROR_FAILED,
- "Error reading data from pseudo-TCP socket.");
- } else if (len > 0) {
- /* Got some data! */
- component->recv_buf_valid_len += len;
- }
-
- received_enough = (component->recv_buf_valid_len == buf_len);
+ received_enough =
+ nice_input_message_iter_is_at_end (&component->recv_messages_iter,
+ component->recv_messages, component->n_recv_messages);
error_reported = (child_error != NULL);
}
/* Each iteration of the main context will either receive some data, a
- * cancellation error or a socket error.
+ * cancellation error or a socket error. In non-reliable mode, the iter’s
+ * @message counter will be incremented after each read.
*
- * In blocking, reliable mode, iterate the loop enough to receive exactly
- * @buf_len bytes. In blocking, non-reliable mode, iterate the loop to receive
- * a single message. In non-blocking mode, stop iterating the loop if all
- * sockets would block (i.e. if no data was received for an iteration).
+ * In blocking, reliable mode, iterate the loop enough to fill exactly
+ * @n_messages messages. In blocking, non-reliable mode, iterate the loop to
+ * receive @n_messages messages (which may not fill all the buffers). In
+ * non-blocking mode, stop iterating the loop if all sockets would block (i.e.
+ * if no data was received for an iteration).
*/
while (!received_enough && !error_reported && !all_sockets_would_block) {
- gsize prev_recv_buf_valid_len = component->recv_buf_valid_len;
+ NiceInputMessageIter prev_recv_messages_iter;
+
+ memcpy (&prev_recv_messages_iter, &component->recv_messages_iter,
+ sizeof (NiceInputMessageIter));
agent_unlock ();
g_main_context_iteration (context, blocking);
agent_lock ();
received_enough =
- ((agent->reliable && component->recv_buf_valid_len == buf_len) ||
- (!agent->reliable && component->recv_buf_valid_len > 0));
+ nice_input_message_iter_is_at_end (&component->recv_messages_iter,
+ component->recv_messages, component->n_recv_messages);
error_reported = (child_error != NULL);
- all_sockets_would_block =
- !blocking && (component->recv_buf_valid_len == prev_recv_buf_valid_len);
+ all_sockets_would_block = (!blocking &&
+ memcmp (&prev_recv_messages_iter, &component->recv_messages_iter,
+ sizeof (NiceInputMessageIter)) == 0);
}
- len = component->recv_buf_valid_len;
- nice_debug ("%s: len: %" G_GSIZE_FORMAT ", buf_len: %" G_GSIZE_FORMAT,
- G_STRFUNC, len, buf_len);
+ n_valid_messages =
+ nice_input_message_iter_get_n_valid_messages (
+ &component->recv_messages_iter); /* grab before resetting the iter */
/* Tidy up. */
if (cancellable_source != NULL) {
@@ -2825,59 +3051,99 @@ nice_agent_recv_blocking_or_nonblocking (NiceAgent *agent, guint stream_id,
/* Handle errors and cancellations. */
if (error_reported) {
- len = -1;
- } else if (len == 0 && all_sockets_would_block) {
+ n_valid_messages = -1;
+ } else if (n_valid_messages == 0 && all_sockets_would_block) {
g_set_error_literal (&child_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
g_strerror (EAGAIN));
- len = -1;
+ n_valid_messages = -1;
}
+ nice_debug ("%s: n_valid_messages: %d, n_messages: %u", G_STRFUNC,
+ n_valid_messages, n_messages);
+
done:
- g_assert ((child_error != NULL) == (len == -1));
- g_assert (len != 0);
- g_assert (len < 0 || (gsize) len <= buf_len);
+ g_assert ((child_error != NULL) == (n_valid_messages == -1));
+ g_assert (n_valid_messages != 0);
+ g_assert (n_valid_messages < 0 || (guint) n_valid_messages <= n_messages);
if (child_error != NULL)
g_propagate_error (error, child_error);
agent_unlock ();
- return len;
+ return n_valid_messages;
}
/**
- * nice_agent_recv:
+ * nice_agent_recv_messages:
* @agent: a #NiceAgent
* @stream_id: the ID of the stream to receive on
* @component_id: the ID of the component to receive on
- * @buf: (array length=buf_len) (out caller-allocates): caller-allocated buffer
- * to write the received data into, of length at least @buf_len
- * @buf_len: length of @buf
+ * @messages: (array length=n_messages) (out caller-allocates): caller-allocated
+ * array of #NiceInputMessages to write the received messages into, of length at
+ * least @n_messages
+ * @n_messages: number of entries in @messages
* @cancellable: (allow-none): a #GCancellable to allow the operation to be
* cancelled from another thread, or %NULL
* @error: (allow-none): return location for a #GError, or %NULL
*
* Block on receiving data from the given stream/component combination on
- * @agent, returning only once at least 1 byte has been received and written
- * into @buf, the stream is closed by the other end or by calling
- * nice_agent_remove_stream(), or @cancellable is cancelled.
+ * @agent, returning only once exactly @n_messages messages have been received
+ * and written into @messages, the stream is closed by the other end or by
+ * calling nice_agent_remove_stream(), or @cancellable is cancelled.
+ *
+ * In the non-error case, in reliable mode, this will block until all buffers in
+ * all @n_messages have been filled with received data (i.e. @messages is
+ * treated as a large, flat array of buffers). In non-reliable mode, it will
+ * block until @n_messages messages have been received, each of which does not
+ * have to fill all the buffers in its #NiceInputMessage. In the non-reliable
+ * case, each #NiceInputMessage must have enough buffers to contain an entire
+ * message (65536 bytes), or any excess data may be silently dropped.
*
- * In the non-error case, in reliable mode, this will block until exactly
- * @buf_len bytes have been received. In non-reliable mode, it will block until
- * a single message has been received. In this case, @buf must be big enough to
- * contain an entire message (65535 bytes), or any excess data may be silently
- * dropped.
+ * For each received message, #NiceInputMessage::length will be set to the
+ * number of valid bytes stored in the message’s buffers. The bytes are stored
+ * sequentially in the buffers; there are no gaps apart from at the end of the
+ * buffer array (in non-reliable mode). If non-%NULL on input,
+ * #NiceInputMessage::from will have the address of the sending peer stored in
+ * it. The base addresses, sizes, and number of buffers in each message will not
+ * be modified in any case.
*
* This must not be used in combination with nice_agent_attach_recv() on the
* same stream/component pair.
*
- * Internally, this may iterate the current thread’s default main context.
- *
* If the stream/component pair doesn’t exist, or if a suitable candidate socket
* hasn’t yet been selected for it, a %G_IO_ERROR_BROKEN_PIPE error will be
* returned. A %G_IO_ERROR_CANCELLED error will be returned if the operation was
* cancelled. %G_IO_ERROR_FAILED will be returned for other errors.
*
+ * Returns: the number of valid messages written to @messages on success
+ * (guaranteed to be greater than 0 unless @n_messages is 0), or -1 on error
+ *
+ * Since: 0.1.5
+ */
+NICEAPI_EXPORT gint
+nice_agent_recv_messages (NiceAgent *agent, guint stream_id, guint component_id,
+ NiceInputMessage *messages, guint n_messages, GCancellable *cancellable,
+ GError **error)
+{
+ return nice_agent_recv_messages_blocking_or_nonblocking (agent, stream_id,
+ component_id, TRUE, messages, n_messages, cancellable, error);
+}
+
+/**
+ * nice_agent_recv:
+ * @agent: a #NiceAgent
+ * @stream_id: the ID of the stream to receive on
+ * @component_id: the ID of the component to receive on
+ * @buf: (array length=buf_len) (out caller-allocates): caller-allocated buffer
+ * to write the received data into, of length at least @buf_len
+ * @buf_len: length of @buf
+ * @cancellable: (allow-none): a #GCancellable to allow the operation to be
+ * cancelled from another thread, or %NULL
+ * @error: (allow-none): return location for a #GError, or %NULL
+ *
+ * A single-message version of nice_agent_recv_messages().
+ *
* Returns: the number of bytes written to @buf on success (guaranteed to be
* greater than 0 unless @buf_len is 0), or -1 on error
*
@@ -2887,47 +3153,80 @@ NICEAPI_EXPORT gssize
nice_agent_recv (NiceAgent *agent, guint stream_id, guint component_id,
guint8 *buf, gsize buf_len, GCancellable *cancellable, GError **error)
{
- return nice_agent_recv_blocking_or_nonblocking (agent, stream_id,
- component_id, TRUE, buf, buf_len, cancellable, error);
+ gint n_valid_messages;
+ GInputVector local_bufs = { buf, buf_len };
+ NiceInputMessage local_messages = { &local_bufs, 1, NULL, 0 };
+
+ n_valid_messages = nice_agent_recv_messages (agent, stream_id, component_id,
+ &local_messages, 1, cancellable, error);
+
+ if (n_valid_messages <= 0)
+ return n_valid_messages;
+
+ return local_messages.length;
}
/**
- * nice_agent_recv_nonblocking:
+ * nice_agent_recv_messages_nonblocking:
* @agent: a #NiceAgent
* @stream_id: the ID of the stream to receive on
* @component_id: the ID of the component to receive on
- * @buf: (array length=buf_len) (out caller-allocates): caller-allocated buffer
- * to write the received data into, of length at least @buf_len
- * @buf_len: length of @buf
+ * @messages: (array length=n_messages) (out caller-allocates): caller-allocated
+ * array of #NiceInputMessages to write the received messages into, of length at
+ * least @n_messages
+ * @n_messages: number of entries in @messages
* @cancellable: (allow-none): a #GCancellable to allow the operation to be
* cancelled from another thread, or %NULL
* @error: (allow-none): return location for a #GError, or %NULL
*
* Try to receive data from the given stream/component combination on @agent,
- * without blocking. If receiving data would block, -1 is returned and a
+ * without blocking. If receiving data would block, -1 is returned and
* %G_IO_ERROR_WOULD_BLOCK is set in @error. If any other error occurs, -1 is
- * returned. Otherwise, 0 is returned if (and only if) @buf_len is 0. In all
- * other cases, the number of bytes read into @buf is returned, and will be
- * greater than 0.
+ * returned and @error is set accordingly. Otherwise, 0 is returned if (and only
+ * if) @n_messages is 0. In all other cases, the number of valid messages stored
+ * in @messages is returned, and will be greater than 0.
*
- * For a reliable @agent, this function will receive as many bytes as possible
- * up to @buf_len. For a non-reliable @agent, it will receive a single message.
- * In this case, @buf must be big enough to contain the entire message (65535
- * bytes), or any excess data may be silently dropped.
+ * This function behaves similarly to nice_agent_recv_messages(), except that it
+ * will not block on filling (in reliable mode) or receiving (in non-reliable
+ * mode) exactly @n_messages messages. In reliable mode, it will receive bytes
+ * into @messages until it would block; in non-reliable mode, it will receive
+ * messages until it would block.
*
* As this function is non-blocking, @cancellable is included only for parity
- * with nice_agent_recv(). If @cancellable is cancelled before this function is
- * called, a %G_IO_ERROR_CANCELLED error will be returned immediately.
+ * with nice_agent_recv_messages(). If @cancellable is cancelled before this
+ * function is called, a %G_IO_ERROR_CANCELLED error will be returned
+ * immediately.
*
* This must not be used in combination with nice_agent_attach_recv() on the
* same stream/component pair.
*
- * Internally, this may iterate the current thread’s default main context.
+ * Returns: the number of valid messages written to @messages on success
+ * (guaranteed to be greater than 0 unless @n_messages is 0), or -1 on error
*
- * If the stream/component pair doesn’t exist, or if a suitable candidate socket
- * hasn’t yet been selected for it, a %G_IO_ERROR_BROKEN_PIPE error will be
- * returned. A %G_IO_ERROR_CANCELLED error will be returned if the operation was
- * cancelled. %G_IO_ERROR_FAILED will be returned for other errors.
+ * Since: 0.1.5
+ */
+NICEAPI_EXPORT gint
+nice_agent_recv_messages_nonblocking (NiceAgent *agent, guint stream_id,
+ guint component_id, NiceInputMessage *messages, guint n_messages,
+ GCancellable *cancellable, GError **error)
+{
+ return nice_agent_recv_messages_blocking_or_nonblocking (agent, stream_id,
+ component_id, FALSE, messages, n_messages, cancellable, error);
+}
+
+/**
+ * nice_agent_recv_nonblocking:
+ * @agent: a #NiceAgent
+ * @stream_id: the ID of the stream to receive on
+ * @component_id: the ID of the component to receive on
+ * @buf: (array length=buf_len) (out caller-allocates): caller-allocated buffer
+ * to write the received data into, of length at least @buf_len
+ * @buf_len: length of @buf
+ * @cancellable: (allow-none): a #GCancellable to allow the operation to be
+ * cancelled from another thread, or %NULL
+ * @error: (allow-none): return location for a #GError, or %NULL
+ *
+ * A single-message version of nice_agent_recv_messages_nonblocking().
*
* Returns: the number of bytes received into @buf on success (guaranteed to be
* greater than 0 unless @buf_len is 0), or -1 on error
@@ -2939,8 +3238,17 @@ nice_agent_recv_nonblocking (NiceAgent *agent, guint stream_id,
guint component_id, guint8 *buf, gsize buf_len, GCancellable *cancellable,
GError **error)
{
- return nice_agent_recv_blocking_or_nonblocking (agent, stream_id,
- component_id, FALSE, buf, buf_len, cancellable, error);
+ gint n_valid_messages;
+ GInputVector local_bufs = { buf, buf_len };
+ NiceInputMessage local_messages = { &local_bufs, 1, NULL, 0 };
+
+ n_valid_messages = nice_agent_recv_messages_nonblocking (agent, stream_id,
+ component_id, &local_messages, 1, cancellable, error);
+
+ if (n_valid_messages <= 0)
+ return n_valid_messages;
+
+ return local_messages.length;
}
/**
@@ -3255,12 +3563,8 @@ component_io_cb (GSocket *socket, GIOCondition condition, gpointer user_data)
Component *component;
NiceAgent *agent;
Stream *stream;
- guint8 local_buf[MAX_BUFFER_SIZE];
- gssize len;
- guint8 *recv_buf;
- gsize recv_buf_len;
- gboolean retval = FALSE;
gboolean has_io_callback;
+ gboolean remove_source = FALSE;
agent_lock ();
@@ -3271,6 +3575,7 @@ component_io_cb (GSocket *socket, GIOCondition condition, gpointer user_data)
if (g_source_is_destroyed (g_main_current_source ())) {
/* Silently return FALSE. */
nice_debug ("%s: source %p destroyed", G_STRFUNC, g_main_current_source ());
+ remove_source = TRUE;
goto done;
}
@@ -3278,65 +3583,145 @@ component_io_cb (GSocket *socket, GIOCondition condition, gpointer user_data)
/* Choose which receive buffer to use. If we’re reading for
* nice_agent_attach_recv(), use a local static buffer. If we’re reading for
- * nice_agent_recv(), use the buffer provided by the client.
+ * nice_agent_recv_messages(), use the buffer provided by the client.
*
* has_io_callback cannot change throughout this function, as we operate
* entirely with the agent lock held, and component_set_io_callback() would
* need to take the agent lock to change the Component’s io_callback. */
- g_assert (!has_io_callback || component->recv_buf == NULL);
+ g_assert (!has_io_callback || component->recv_messages == NULL);
- if (has_io_callback) {
- recv_buf = local_buf;
- recv_buf_len = sizeof (local_buf);
- } else if (component->recv_buf != NULL) {
- recv_buf = component->recv_buf + component->recv_buf_valid_len;
- recv_buf_len = component->recv_buf_len - component->recv_buf_valid_len;
- } else {
- /* I/O is paused. Try again later. */
- retval = TRUE;
- goto done;
- }
+ if (agent->reliable) {
+#define TCP_HEADER_SIZE 24 /* bytes */
+ guint8 local_header_buf[TCP_HEADER_SIZE];
+ /* FIXME: Currently, the critical path for reliable packet delivery has two
+ * memcpy()s: one into the pseudo-TCP receive buffer, and one out of it.
+ * This could moderately easily be reduced to one memcpy() in the common
+ * case of in-order packet delivery, by replacing local_body_buf with a
+ * pointer into the pseudo-TCP receive buffer. If it turns out the packet
+ * is out-of-order (which we can only know after parsing its header), the
+ * data will need to be moved in the buffer. If the packet *is* in order,
+ * however, the only memcpy() then needed is from the pseudo-TCP receive
+ * buffer to the client’s message buffers.
+ *
+ * In fact, in the case of a reliable agent with I/O callbacks, zero
+ * memcpy()s can be achieved (for in-order packet delivery) by emittin the
+ * I/O callback directly from the pseudo-TCP receive buffer. */
+ guint8 local_body_buf[MAX_BUFFER_SIZE];
+ GInputVector local_bufs[] = {
+ { local_header_buf, sizeof (local_header_buf) },
+ { local_body_buf, sizeof (local_body_buf) },
+ };
+ NiceInputMessage local_message = {
+ local_bufs, G_N_ELEMENTS (local_bufs), NULL, 0
+ };
+ RecvStatus retval = 0;
+
+ if (component->tcp == NULL) {
+ nice_debug ("Agent %p: not handling incoming packet for s%d:%d "
+ "because pseudo-TCP socket does not exist in reliable mode.", agent,
+ stream->id, component->id);
+ remove_source = TRUE;
+ goto done;
+ }
- /* Actually read the data. This will return 0 if the data has already been
- * handled (e.g. for STUN control packets). */
- len = agent_recv_locked (agent, stream, component, socket_source->socket,
- recv_buf, recv_buf_len);
+ while (has_io_callback ||
+ (component->recv_messages != NULL &&
+ !nice_input_message_iter_is_at_end (&component->recv_messages_iter,
+ component->recv_messages, component->n_recv_messages))) {
+ /* Receive a single message. This will receive it into the given
+ * @local_bufs then, for pseudo-TCP, emit I/O callbacks or copy it into
+ * component->recv_messages in pseudo_tcp_socket_readable(). STUN packets
+ * will be parsed in-place. */
+ retval = agent_recv_message_unlocked (agent, stream, component,
+ socket_source->socket, &local_message);
+
+ nice_debug ("%s: received %d valid messages with %" G_GSSIZE_FORMAT
+ " bytes", G_STRFUNC, retval, local_message.length);
+
+ /* Don’t expect any valid messages to escape pseudo_tcp_socket_readable()
+ * when in reliable mode. */
+ g_assert_cmpint (retval, !=, RECV_SUCCESS);
+
+ if (retval == RECV_WOULD_BLOCK) {
+ /* EWOULDBLOCK. */
+ break;
+ } else if (retval == RECV_ERROR) {
+ /* Other error. */
+ nice_debug ("%s: error receiving message", G_STRFUNC);
+ remove_source = TRUE;
+ break;
+ }
- nice_debug ("Received %" G_GSSIZE_FORMAT " bytes on source %p "
- "(socket %p, FD %d).", len,
- socket_source->source, socket_source->socket,
- g_socket_get_fd (socket_source->socket->fileno));
+ has_io_callback = component_has_io_callback (component);
+ }
+ } else if (!agent->reliable && has_io_callback) {
+ while (has_io_callback) {
+ guint8 local_buf[MAX_BUFFER_SIZE];
+ GInputVector local_bufs = { local_buf, sizeof (local_buf) };
+ NiceInputMessage local_message = { &local_bufs, 1, NULL, 0 };
+ RecvStatus retval;
+
+ /* Receive a single message. */
+ retval = agent_recv_message_unlocked (agent, stream, component,
+ socket_source->socket, &local_message);
+
+ nice_debug ("%s: received %d valid messages with %" G_GSSIZE_FORMAT
+ " bytes", G_STRFUNC, retval, local_message.length);
+
+ if (retval == RECV_WOULD_BLOCK) {
+ /* EWOULDBLOCK. */
+ break;
+ } else if (retval == RECV_ERROR) {
+ /* Other error. */
+ nice_debug ("%s: error receiving message", G_STRFUNC);
+ remove_source = TRUE;
+ break;
+ }
- if (len == 0) {
- /* No data was available, probably due to being a reliable connection and
- * hence the data is stored in the pseudotcp buffer. */
- retval = TRUE;
- goto done;
- } else if (len < 0) {
- /* Error. Detach the source but don’t close the socket. We don’t close the
- * socket because it would be way too complicated to take care of every path
- * where it might still be used. */
- g_set_error (component->recv_buf_error, G_IO_ERROR, G_IO_ERROR_FAILED,
- "Unable to receive from socket %p. Detaching.", socket);
- nice_debug ("%s: error receiving from socket %p", G_STRFUNC, socket);
- goto done;
- }
+ if (retval == RECV_SUCCESS && local_message.length > 0)
+ component_emit_io_callback (component, local_buf, local_message.length);
- /* Actual data to notify the client about. */
- if (has_io_callback) {
- component_emit_io_callback (component, recv_buf, len);
- } else {
- /* Data has been stored in the component’s receive buffer to be picked up
- * later by nice_agent_recv(). */
- component->recv_buf_valid_len += len;
+ has_io_callback = component_has_io_callback (component);
+ }
+ } else if (!agent->reliable && component->recv_messages != NULL) {
+ RecvStatus retval;
+
+ /* Don’t want to trample over partially-valid buffers. */
+ g_assert (component->recv_messages_iter.buffer == 0);
+ g_assert (component->recv_messages_iter.offset == 0);
+
+ while (!nice_input_message_iter_is_at_end (&component->recv_messages_iter,
+ component->recv_messages, component->n_recv_messages)) {
+ /* Receive a single message. This will receive it into the given
+ * user-provided #NiceInputMessage, which it’s the user’s responsibility
+ * to ensure is big enough to avoid data loss (since we’re in non-reliable
+ * mode). Iterate to receive as many messages as possible.
+ *
+ * STUN packets will be parsed in-place. */
+ retval = agent_recv_message_unlocked (agent, stream, component,
+ socket_source->socket,
+ &component->recv_messages[component->recv_messages_iter.message]);
+
+ nice_debug ("%s: received %d valid messages", G_STRFUNC, retval);
+
+ if (retval == RECV_SUCCESS) {
+ /* Successfully received a single message. */
+ component->recv_messages_iter.message++;
+ } else if (retval == RECV_WOULD_BLOCK) {
+ /* EWOULDBLOCK. */
+ break;
+ } else if (retval == RECV_ERROR) {
+ /* Other error. */
+ remove_source = TRUE;
+ break;
+ }
+ }
}
- retval = TRUE;
-
done:
agent_unlock ();
- return retval;
+ return !remove_source;
}
NICEAPI_EXPORT gboolean
diff --git a/agent/agent.h b/agent/agent.h
index 2577fce..c9c0b78 100644
--- a/agent/agent.h
+++ b/agent/agent.h
@@ -58,16 +58,16 @@
* for valid streams/components.
*
* Each stream can receive data in one of two ways: using
- * nice_agent_attach_recv() or nice_agent_recv() (and the derived
+ * nice_agent_attach_recv() or nice_agent_recv_messages() (and the derived
* #NiceInputStream and #NiceIOStream classes accessible using
* nice_agent_build_io_stream()). nice_agent_attach_recv() is non-blocking: it
* takes a user-provided callback function and attaches the stream’s socket to
* the provided #GMainContext, invoking the callback in that context for every
- * packet received. nice_agent_recv() instead blocks on receiving a packet, and
- * writes it directly into a user-provided buffer. This reduces the number of
- * callback invokations and (potentially) buffer copies required to receive
- * packets. nice_agent_recv() (or #NiceInputStream) is designed to be used in a
- * blocking loop in a separate thread.
+ * packet received. nice_agent_recv_messages() instead blocks on receiving a
+ * packet, and writes it directly into a user-provided buffer. This reduces the
+ * number of callback invokations and (potentially) buffer copies required to
+ * receive packets. nice_agent_recv_messages() (or #NiceInputStream) is designed
+ * to be used in a blocking loop in a separate thread.
*
<example>
<title>Simple example on how to use libnice</title>
@@ -721,7 +721,7 @@ nice_agent_restart (
* Attaches the stream's component's sockets to the Glib Mainloop Context in
* order to be notified whenever data becomes available for a component.
*
- * This must not be used in combination with nice_agent_recv() (or
+ * This must not be used in combination with nice_agent_recv_messages() (or
* #NiceIOStream or #NiceInputStream) on the same stream/component pair.
*
* Calling nice_agent_attach_recv() with a %NULL @func will detach any existing
@@ -751,6 +751,16 @@ nice_agent_recv (
GCancellable *cancellable,
GError **error);
+gint
+nice_agent_recv_messages (
+ NiceAgent *agent,
+ guint stream_id,
+ guint component_id,
+ NiceInputMessage *messages,
+ guint n_messages,
+ GCancellable *cancellable,
+ GError **error);
+
gssize
nice_agent_recv_nonblocking (
NiceAgent *agent,
@@ -761,6 +771,16 @@ nice_agent_recv_nonblocking (
GCancellable *cancellable,
GError **error);
+gint
+nice_agent_recv_messages_nonblocking (
+ NiceAgent *agent,
+ guint stream_id,
+ guint component_id,
+ NiceInputMessage *messages,
+ guint n_messages,
+ GCancellable *cancellable,
+ GError **error);
+
/**
* nice_agent_set_selected_pair:
* @agent: The #NiceAgent Object
diff --git a/agent/component.c b/agent/component.c
index 1b41942..9060a15 100644
--- a/agent/component.c
+++ b/agent/component.c
@@ -126,8 +126,8 @@ component_new (guint id, NiceAgent *agent, Stream *stream)
component->io_callback_id = 0;
/* Start off with a fresh main context and all I/O paused. This
- * will be updated when nice_agent_attach_recv() or nice_agent_recv() are
- * called. */
+ * will be updated when nice_agent_attach_recv() or nice_agent_recv_messages()
+ * are called. */
component_set_io_context (component, NULL);
component_set_io_callback (component, NULL, NULL, NULL, 0, NULL);
@@ -561,9 +561,9 @@ component_set_io_context (Component *component, GMainContext *context)
g_mutex_unlock (&component->io_mutex);
}
-/* (func, user_data) and (recv_buf, recv_buf_len) are mutually exclusive.
- * At most one of the two must be specified; if both are NULL, the Component
- * will not receive any data (i.e. reception is paused).
+/* (func, user_data) and (recv_messages, n_recv_messages) are mutually
+ * exclusive. At most one of the two must be specified; if both are NULL, the
+ * Component will not receive any data (i.e. reception is paused).
*
* Apart from during setup, this must always be called with the agent lock held,
* and the I/O lock released (because it takes the I/O lock itself). Requiring
@@ -574,11 +574,11 @@ component_set_io_context (Component *component, GMainContext *context)
void
component_set_io_callback (Component *component,
NiceAgentRecvFunc func, gpointer user_data,
- guint8 *recv_buf, gsize recv_buf_len,
+ NiceInputMessage *recv_messages, guint n_recv_messages,
GError **error)
{
- g_assert (func == NULL || recv_buf == NULL);
- g_assert (recv_buf != NULL || recv_buf_len == 0);
+ g_assert (func == NULL || recv_messages == NULL);
+ g_assert (n_recv_messages == 0 || recv_messages != NULL);
g_assert (error == NULL || *error == NULL);
g_mutex_lock (&component->io_mutex);
@@ -586,20 +586,20 @@ component_set_io_callback (Component *component,
if (func != NULL) {
component->io_callback = func;
component->io_user_data = user_data;
- component->recv_buf = NULL;
- component->recv_buf_len = 0;
+ component->recv_messages = NULL;
+ component->n_recv_messages = 0;
component_schedule_io_callback (component);
} else {
component->io_callback = NULL;
component->io_user_data = NULL;
- component->recv_buf = recv_buf;
- component->recv_buf_len = recv_buf_len;
+ component->recv_messages = recv_messages;
+ component->n_recv_messages = n_recv_messages;
component_deschedule_io_callback (component);
}
- component->recv_buf_valid_len = 0;
+ nice_input_message_iter_reset (&component->recv_messages_iter);
component->recv_buf_error = error;
g_mutex_unlock (&component->io_mutex);
diff --git a/agent/component.h b/agent/component.h
index c6e20c6..d76de0f 100644
--- a/agent/component.h
+++ b/agent/component.h
@@ -45,6 +45,7 @@
typedef struct _Component Component;
#include "agent.h"
+#include "agent-priv.h"
#include "candidate.h"
#include "stun/stunagent.h"
#include "stun/usages/timer.h"
@@ -110,7 +111,7 @@ typedef struct {
} SocketSource;
-/* A buffer of data which has been received and processed (so is guaranteed not
+/* A message which has been received and processed (so is guaranteed not
* to be a STUN packet, or to contain pseudo-TCP header bytes, for example), but
* which hasn’t yet been sent to the client in an I/O callback. This could be
* due to the main context not being run, or due to the I/O callback being
@@ -153,8 +154,8 @@ struct _Component
* socket recv() operations. All io_callback emissions are invoked in this
* context too.
*
- * recv_buf and io_callback are mutually exclusive, but it is allowed for both
- * to be NULL if the Component is not currently ready to receive data. */
+ * recv_messages and io_callback are mutually exclusive, but it is allowed for
+ * both to be NULL if the Component is not currently ready to receive data. */
GMutex io_mutex; /**< protects io_callback, io_user_data,
pending_io_messages and io_callback_id.
immutable: can be accessed without
@@ -163,7 +164,7 @@ struct _Component
taken before this one */
NiceAgentRecvFunc io_callback; /**< function called on io cb */
gpointer io_user_data; /**< data passed to the io function */
- GQueue pending_io_messages; /**< queue of packets which have been
+ GQueue pending_io_messages; /**< queue of messages which have been
received but not passed to the client
in an I/O callback or recv() call yet.
each element is an owned
@@ -172,9 +173,10 @@ struct _Component
GMainContext *ctx; /**< context for GSources for this
component */
- guint8 *recv_buf; /**< unowned buffer for receiving into */
- gsize recv_buf_len; /**< allocated size of recv_buf in bytes */
- gsize recv_buf_valid_len; /**< length of valid data in recv_buf */
+ NiceInputMessage *recv_messages; /**< unowned messages for receiving into */
+ guint n_recv_messages; /**< length of recv_messages */
+ NiceInputMessageIter recv_messages_iter; /**< current write position in
+ recv_messages */
GError **recv_buf_error; /**< error information about failed reads */
NiceAgent *agent; /* unowned, immutable: can be accessed without holding the
@@ -240,7 +242,7 @@ component_set_io_context (Component *component, GMainContext *context);
void
component_set_io_callback (Component *component,
NiceAgentRecvFunc func, gpointer user_data,
- guint8 *recv_buf, gsize recv_buf_len,
+ NiceInputMessage *recv_messages, guint n_recv_messages,
GError **error);
void
component_emit_io_callback (Component *component,
diff --git a/agent/pseudotcp.c b/agent/pseudotcp.c
index 58627dc..be7cfdc 100644
--- a/agent/pseudotcp.c
+++ b/agent/pseudotcp.c
@@ -74,6 +74,7 @@
#endif
#include "pseudotcp.h"
+#include "agent-priv.h"
G_DEFINE_TYPE (PseudoTcpSocket, pseudo_tcp_socket, G_TYPE_OBJECT);
@@ -497,8 +498,9 @@ static guint32 queue(PseudoTcpSocket *self, const gchar * data,
guint32 len, gboolean bCtrl);
static PseudoTcpWriteResult packet(PseudoTcpSocket *self, guint32 seq,
guint8 flags, guint32 offset, guint32 len);
-static gboolean parse(PseudoTcpSocket *self,
- const guint8 * buffer, guint32 size);
+static gboolean parse (PseudoTcpSocket *self,
+ const guint8 *_header_buf, gsize header_buf_len,
+ const guint8 *data_buf, gsize data_buf_len);
static gboolean process(PseudoTcpSocket *self, Segment *seg);
static gboolean transmit(PseudoTcpSocket *self, const GList *seg, guint32 now);
static void attempt_send(PseudoTcpSocket *self, SendFlags sflags);
@@ -882,12 +884,45 @@ pseudo_tcp_socket_notify_packet(PseudoTcpSocket *self,
if (len > MAX_PACKET) {
//LOG_F(WARNING) << "packet too large";
return FALSE;
+ } else if (len < HEADER_SIZE) {
+ //LOG_F(WARNING) << "packet too small";
+ return FALSE;
+ }
+
+ /* Hold a reference to the PseudoTcpSocket during parsing, since it may be
+ * closed from within a callback. */
+ g_object_ref (self);
+ retval = parse (self, (guint8 *) buffer, HEADER_SIZE,
+ (guint8 *) buffer + HEADER_SIZE, len - HEADER_SIZE);
+ g_object_unref (self);
+
+ return retval;
+}
+
+/* Assume there are two buffers in the given #NiceInputMessage: a 24-byte one
+ * containing the header, and a bigger one for the data. */
+gboolean
+pseudo_tcp_socket_notify_message (PseudoTcpSocket *self,
+ NiceInputMessage *message)
+{
+ gboolean retval;
+
+ g_assert_cmpuint (message->n_buffers, ==, 2);
+ g_assert_cmpuint (message->buffers[0].size, ==, HEADER_SIZE);
+
+ if (message->length > MAX_PACKET) {
+ //LOG_F(WARNING) << "packet too large";
+ return FALSE;
+ } else if (message->length < HEADER_SIZE) {
+ //LOG_F(WARNING) << "packet too small";
+ return FALSE;
}
/* Hold a reference to the PseudoTcpSocket during parsing, since it may be
* closed from within a callback. */
g_object_ref (self);
- retval = parse (self, (guint8 *) buffer, len);
+ retval = parse (self, message->buffers[0].buffer, message->buffers[0].size,
+ message->buffers[1].buffer, message->length - message->buffers[0].size);
g_object_unref (self);
return retval;
@@ -1126,7 +1161,8 @@ packet(PseudoTcpSocket *self, guint32 seq, guint8 flags,
}
static gboolean
-parse(PseudoTcpSocket *self, const guint8 * _buffer, guint32 size)
+parse (PseudoTcpSocket *self, const guint8 *_header_buf, gsize header_buf_len,
+ const guint8 *data_buf, gsize data_buf_len)
{
Segment seg;
@@ -1134,24 +1170,24 @@ parse(PseudoTcpSocket *self, const guint8 * _buffer, guint32 size)
const guint8 *u8;
const guint16 *u16;
const guint32 *u32;
- } buffer;
+ } header_buf;
- buffer.u8 = _buffer;
+ header_buf.u8 = _header_buf;
- if (size < 12)
+ if (header_buf_len != 24)
return FALSE;
- seg.conv = ntohl(*buffer.u32);
- seg.seq = ntohl(*(buffer.u32 + 1));
- seg.ack = ntohl(*(buffer.u32 + 2));
- seg.flags = buffer.u8[13];
- seg.wnd = ntohs(*(buffer.u16 + 7));
+ seg.conv = ntohl(*header_buf.u32);
+ seg.seq = ntohl(*(header_buf.u32 + 1));
+ seg.ack = ntohl(*(header_buf.u32 + 2));
+ seg.flags = header_buf.u8[13];
+ seg.wnd = ntohs(*(header_buf.u16 + 7));
- seg.tsval = ntohl(*(buffer.u32 + 4));
- seg.tsecr = ntohl(*(buffer.u32 + 5));
+ seg.tsval = ntohl(*(header_buf.u32 + 4));
+ seg.tsecr = ntohl(*(header_buf.u32 + 5));
- seg.data = ((gchar *)buffer.u8) + HEADER_SIZE;
- seg.len = size - HEADER_SIZE;
+ seg.data = (const gchar *) data_buf;
+ seg.len = data_buf_len;
DEBUG (PSEUDO_TCP_DEBUG_VERBOSE, "--> <CONV=%d><FLG=%d><SEQ=%d:%d><ACK=%d>"
"<WND=%d><TS=%d><TSR=%d><LEN=%d>",
diff --git a/agent/pseudotcp.h b/agent/pseudotcp.h
index f5cd1a9..a43c3d4 100644
--- a/agent/pseudotcp.h
+++ b/agent/pseudotcp.h
@@ -66,6 +66,8 @@
# define ECONNRESET WSAECONNRESET
#endif
+#include "agent.h"
+
G_BEGIN_DECLS
/**
@@ -403,6 +405,22 @@ gboolean pseudo_tcp_socket_notify_packet(PseudoTcpSocket *self,
/**
+ * pseudo_tcp_socket_notify_message:
+ * @self: The #PseudoTcpSocket object.
+ * @message: A #NiceInputMessage containing the received data.
+ *
+ * Notify the #PseudoTcpSocket that a new message has arrived, and enqueue the
+ * data in its buffers to the #PseudoTcpSocket’s receive buffer.
+ *
+ * Returns: %TRUE if the packet was processed successfully, %FALSE otherwise
+ *
+ * Since: 0.1.5
+ */
+gboolean pseudo_tcp_socket_notify_message (PseudoTcpSocket *self,
+ NiceInputMessage *message);
+
+
+/**
* pseudo_tcp_set_debug_level:
* @level: The level of debug to set
*
diff --git a/docs/reference/libnice/libnice-sections.txt b/docs/reference/libnice/libnice-sections.txt
index 586e5cf..e8fb305 100644
--- a/docs/reference/libnice/libnice-sections.txt
+++ b/docs/reference/libnice/libnice-sections.txt
@@ -26,7 +26,9 @@ nice_agent_get_selected_pair
nice_agent_send
nice_agent_send_full
nice_agent_recv
+nice_agent_recv_messages
nice_agent_recv_nonblocking
+nice_agent_recv_messages_nonblocking
nice_agent_attach_recv
nice_agent_set_selected_pair
nice_agent_set_selected_remote_candidate
diff --git a/nice/libnice.sym b/nice/libnice.sym
index 92f553f..601afc6 100644
--- a/nice/libnice.sym
+++ b/nice/libnice.sym
@@ -18,7 +18,9 @@ nice_agent_add_local_address
nice_agent_add_stream
nice_agent_build_io_stream
nice_agent_recv
+nice_agent_recv_messages
nice_agent_recv_nonblocking
+nice_agent_recv_messages_nonblocking
nice_agent_attach_recv
nice_agent_gather_candidates
nice_agent_generate_local_candidate_sdp
diff --git a/socket/turn.c b/socket/turn.c
index b153be7..ce5011b 100644
--- a/socket/turn.c
+++ b/socket/turn.c
@@ -822,6 +822,45 @@ priv_binding_timeout (gpointer data)
return FALSE;
}
+guint
+nice_turn_socket_parse_recv_message (NiceSocket *sock, NiceSocket **from_sock,
+ NiceInputMessage *message)
+{
+ /* TODO: Speed this up in the common reliable case of having a 24-byte header
+ * buffer to begin with, followed by one or more massive buffers. */
+ guint8 *buf;
+ gsize buf_len, len;
+
+ if (message->n_buffers == 1 ||
+ (message->n_buffers == -1 &&
+ message->buffers[0].buffer != NULL &&
+ message->buffers[1].buffer == NULL)) {
+ /* Fast path. Single massive buffer. */
+ g_assert_cmpuint (message->length, <=, message->buffers[0].size);
+
+ len = nice_turn_socket_parse_recv (sock, from_sock,
+ message->from, message->length, message->buffers[0].buffer,
+ message->from, message->buffers[0].buffer, message->length);
+
+ g_assert_cmpuint (len, <=, message->length);
+
+ message->length = len;
+
+ return (len > 0) ? 1 : 0;
+ }
+
+ /* Slow path. */
+ nice_debug ("%s: **WARNING: SLOW PATH**", G_STRFUNC);
+
+ buf = compact_input_message (message, &buf_len);
+ len = nice_turn_socket_parse_recv (sock, from_sock,
+ message->from, buf_len, buf,
+ message->from, buf, buf_len);
+ len = memcpy_buffer_to_input_message (message, buf, len);
+
+ return (len > 0) ? 1 : 0;
+}
+
gsize
nice_turn_socket_parse_recv (NiceSocket *sock, NiceSocket **from_sock,
NiceAddress *from, gsize len, guint8 *buf,
diff --git a/socket/turn.h b/socket/turn.h
index b700430..61d4e8c 100644
--- a/socket/turn.h
+++ b/socket/turn.h
@@ -52,6 +52,10 @@ typedef enum {
G_BEGIN_DECLS
+guint
+nice_turn_socket_parse_recv_message (NiceSocket *sock, NiceSocket **from_sock,
+ NiceInputMessage *message);
+
gsize
nice_turn_socket_parse_recv (NiceSocket *sock, NiceSocket **from_sock,
NiceAddress *from, gsize len, guint8 *buf,