summaryrefslogtreecommitdiff
path: root/socket/tcp-bsd.c
diff options
context:
space:
mode:
authorPhilip Withnall <philip.withnall@collabora.co.uk>2014-01-20 07:56:47 +0000
committerOlivier Crête <olivier.crete@collabora.com>2014-01-31 01:49:07 -0500
commit515481e6f45da24689d27c8eff60f4b5fa849c49 (patch)
tree90203dd3ce4e5a36dbc5e95d10c9bfa589e57a59 /socket/tcp-bsd.c
parent55e53a9ce7be57740993d8fd9b1ca71d410c6388 (diff)
downloadlibnice-515481e6f45da24689d27c8eff60f4b5fa849c49.tar.gz
socket: Add vectored I/O support for sending on sockets
Replace the send() API with a send_messages() API, which supports sending multiple messages, each with multiple buffers rather than a single monolithic buffer. This doesn’t break API, as the socket API is not exposed outside libnice. It does introduce a new struct: NiceOutputMessage, which is analogous to struct mmsghdr and NiceInputMessage. This includes updates to the test-bsd test to cover the changed API. The existing nice_socket_send() API has been retained as a thin wrapper around nice_socket_send_messages(), for convenience only. It’s hoped that internal usage of this API will decline to the point where it can be removed.
Diffstat (limited to 'socket/tcp-bsd.c')
-rw-r--r--socket/tcp-bsd.c162
1 files changed, 113 insertions, 49 deletions
diff --git a/socket/tcp-bsd.c b/socket/tcp-bsd.c
index da5a8d6..6e54b22 100644
--- a/socket/tcp-bsd.c
+++ b/socket/tcp-bsd.c
@@ -62,8 +62,8 @@ typedef struct {
} TcpPriv;
struct to_be_sent {
- guint length;
- gchar *buf;
+ guint8 *buf;
+ gsize length;
gboolean can_drop;
};
@@ -72,13 +72,13 @@ struct to_be_sent {
static void socket_close (NiceSocket *sock);
static gint socket_recv_messages (NiceSocket *sock,
NiceInputMessage *recv_messages, guint n_recv_messages);
-static gboolean socket_send (NiceSocket *sock, const NiceAddress *to,
- guint len, const gchar *buf);
+static gint socket_send_messages (NiceSocket *sock,
+ const NiceOutputMessage *messages, guint n_messages);
static gboolean socket_is_reliable (NiceSocket *sock);
-static void add_to_be_sent (NiceSocket *sock, const gchar *buf, guint len,
- gboolean head);
+static void add_to_be_sent (NiceSocket *sock, const NiceOutputMessage *message,
+ gsize message_offset, gboolean head);
static void free_to_be_sent (struct to_be_sent *tbs);
static gboolean socket_send_more (GSocket *gsocket, GIOCondition condition,
gpointer data);
@@ -169,7 +169,7 @@ nice_tcp_bsd_socket_new (GMainContext *ctx, NiceAddress *addr)
priv->error = FALSE;
sock->fileno = gsock;
- sock->send = socket_send;
+ sock->send_messages = socket_send_messages;
sock->recv_messages = socket_recv_messages;
sock->is_reliable = socket_is_reliable;
sock->close = socket_close;
@@ -252,59 +252,88 @@ socket_recv_messages (NiceSocket *sock,
return i;
}
-/* Data sent to this function must be a single entity because buffers can be
- * dropped if the bandwidth isn't fast enough. So do not send a message in
- * multiple chunks. */
-static gboolean
-socket_send (NiceSocket *sock, const NiceAddress *to,
- guint len, const gchar *buf)
+static gssize
+socket_send_message (NiceSocket *sock, const NiceOutputMessage *message)
{
TcpPriv *priv = sock->priv;
- int ret;
+ gssize ret;
GError *gerr = NULL;
/* Don't try to access the socket if it had an error, otherwise we risk a
- crash with SIGPIPE (Broken pipe) */
+ * crash with SIGPIPE (Broken pipe) */
if (priv->error)
return -1;
/* First try to send the data, don't send it later if it can be sent now
- this way we avoid allocating memory on every send */
+ * this way we avoid allocating memory on every send */
if (g_queue_is_empty (&priv->send_queue)) {
- ret = g_socket_send (sock->fileno, buf, len, NULL, &gerr);
+ ret = g_socket_send_message (sock->fileno, NULL, message->buffers,
+ message->n_buffers, NULL, 0, G_SOCKET_MSG_NONE, NULL, &gerr);
+
if (ret < 0) {
- if(g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)
- || g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_FAILED)) {
- add_to_be_sent (sock, buf, len, FALSE);
- g_error_free (gerr);
- return TRUE;
- } else {
- g_error_free (gerr);
- return FALSE;
+ if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
+ g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_FAILED)) {
+ /* Queue the message and send it later. */
+ add_to_be_sent (sock, message, 0, FALSE);
+ ret = message->length;
}
- } else if ((guint)ret < len) {
- add_to_be_sent (sock, buf + ret, len - ret, TRUE);
- return TRUE;
+
+ g_error_free (gerr);
+ } else if ((gsize) ret < message->length) {
+ /* Partial send. */
+ add_to_be_sent (sock, message, ret, TRUE);
+ ret = message->length;
}
} else {
- if (g_queue_get_length(&priv->send_queue) >= MAX_QUEUE_LENGTH) {
- int peek_idx = 0;
+ /* If the queue is too long, drop whatever packets we can. */
+ if (g_queue_get_length (&priv->send_queue) >= MAX_QUEUE_LENGTH) {
+ guint peek_idx = 0;
struct to_be_sent *tbs = NULL;
+
while ((tbs = g_queue_peek_nth (&priv->send_queue, peek_idx)) != NULL) {
if (tbs->can_drop) {
tbs = g_queue_pop_nth (&priv->send_queue, peek_idx);
- g_free (tbs->buf);
- g_slice_free (struct to_be_sent, tbs);
+ free_to_be_sent (tbs);
break;
} else {
peek_idx++;
}
}
}
- add_to_be_sent (sock, buf, len, FALSE);
+
+ /* Queue the message and send it later. */
+ add_to_be_sent (sock, message, 0, FALSE);
+ ret = message->length;
}
- return TRUE;
+ return ret;
+}
+
+/* Data sent to this function must be a single entity because buffers can be
+ * dropped if the bandwidth isn't fast enough. So do not send a message in
+ * multiple chunks. */
+static gint
+socket_send_messages (NiceSocket *sock, const NiceOutputMessage *messages,
+ guint n_messages)
+{
+ guint i;
+
+ for (i = 0; i < n_messages; i++) {
+ const NiceOutputMessage *message = &messages[i];
+ gssize len;
+
+ len = socket_send_message (sock, message);
+
+ if (len < 0) {
+ /* Error. */
+ return len;
+ } else if (len == 0) {
+ /* EWOULDBLOCK. */
+ break;
+ }
+ }
+
+ return i;
}
static gboolean
@@ -348,28 +377,37 @@ socket_send_more (
/* connection hangs up */
ret = -1;
} else {
- ret = g_socket_send (sock->fileno, tbs->buf, tbs->length, NULL, &gerr);
+ GOutputVector local_bufs = { tbs->buf, tbs->length };
+ ret = g_socket_send_message (sock->fileno, NULL, &local_bufs, 1, NULL, 0,
+ G_SOCKET_MSG_NONE, NULL, &gerr);
}
if (ret < 0) {
- if(gerr != NULL &&
+ if (gerr != NULL &&
g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
- add_to_be_sent (sock, tbs->buf, tbs->length, TRUE);
- g_free (tbs->buf);
- g_slice_free (struct to_be_sent, tbs);
+ GOutputVector local_buf = { tbs->buf, tbs->length };
+ NiceOutputMessage local_message = {
+ &local_buf, 1, NULL, local_buf.size
+ };
+
+ add_to_be_sent (sock, &local_message, 0, TRUE);
+ free_to_be_sent (tbs);
g_error_free (gerr);
break;
}
g_error_free (gerr);
} else if (ret < (int) tbs->length) {
- add_to_be_sent (sock, tbs->buf + ret, tbs->length - ret, TRUE);
- g_free (tbs->buf);
- g_slice_free (struct to_be_sent, tbs);
+ GOutputVector local_buf = { tbs->buf + ret, tbs->length - ret };
+ NiceOutputMessage local_message = {
+ &local_buf, 1, NULL, local_buf.size
+ };
+
+ add_to_be_sent (sock, &local_message, 0, TRUE);
+ free_to_be_sent (tbs);
break;
}
- g_free (tbs->buf);
- g_slice_free (struct to_be_sent, tbs);
+ free_to_be_sent (tbs);
}
if (g_queue_is_empty (&priv->send_queue)) {
@@ -386,19 +424,25 @@ socket_send_more (
}
+/* Queue data starting at byte offset @message_offset from @message’s
+ * buffers. */
static void
-add_to_be_sent (NiceSocket *sock, const gchar *buf, guint len, gboolean head)
+add_to_be_sent (NiceSocket *sock, const NiceOutputMessage *message,
+ gsize message_offset, gboolean head)
{
TcpPriv *priv = sock->priv;
- struct to_be_sent *tbs = NULL;
+ struct to_be_sent *tbs;
+ guint j;
+ gsize offset = 0;
- if (len <= 0)
+ if (message_offset >= message->length)
return;
tbs = g_slice_new0 (struct to_be_sent);
- tbs->buf = g_memdup (buf, len);
- tbs->length = len;
+ tbs->buf = g_malloc (message->length - message_offset);
+ tbs->length = message->length - message_offset;
tbs->can_drop = !head;
+
if (head)
g_queue_push_head (&priv->send_queue, tbs);
else
@@ -410,6 +454,26 @@ add_to_be_sent (NiceSocket *sock, const gchar *buf, guint len, gboolean head)
sock, NULL);
g_source_attach (priv->io_source, priv->context);
}
+
+ /* Move the data into the buffer. */
+ for (j = 0;
+ (message->n_buffers >= 0 && j < (guint) message->n_buffers) ||
+ (message->n_buffers < 0 && message->buffers[j].buffer != NULL);
+ j++) {
+ const GOutputVector *buffer = &message->buffers[j];
+ gsize len;
+
+ /* Skip this buffer if it’s within @message_offset. */
+ if (buffer->size <= message_offset) {
+ message_offset -= buffer->size;
+ continue;
+ }
+
+ len = MIN (tbs->length - offset, buffer->size - message_offset);
+ memcpy (tbs->buf + offset, (guint8 *) buffer->buffer + message_offset, len);
+ offset += len;
+ message_offset -= len;
+ }
}