diff options
author | Philip Withnall <philip.withnall@collabora.co.uk> | 2014-01-20 07:56:47 +0000 |
---|---|---|
committer | Olivier Crête <olivier.crete@collabora.com> | 2014-01-31 01:49:07 -0500 |
commit | 515481e6f45da24689d27c8eff60f4b5fa849c49 (patch) | |
tree | 90203dd3ce4e5a36dbc5e95d10c9bfa589e57a59 /socket/tcp-bsd.c | |
parent | 55e53a9ce7be57740993d8fd9b1ca71d410c6388 (diff) | |
download | libnice-515481e6f45da24689d27c8eff60f4b5fa849c49.tar.gz |
socket: Add vectored I/O support for sending on sockets
Replace the send() API with a send_messages() API, which supports
sending multiple messages, each with multiple buffers rather than a
single monolithic buffer.
This doesn’t break API, as the socket API is not exposed outside
libnice. It does introduce a new struct: NiceOutputMessage, which is
analogous to struct mmsghdr and NiceInputMessage.
This includes updates to the test-bsd test to cover the changed API.
The existing nice_socket_send() API has been retained as a thin wrapper
around nice_socket_send_messages(), for convenience only. It’s hoped
that internal usage of this API will decline to the point where it can
be removed.
Diffstat (limited to 'socket/tcp-bsd.c')
-rw-r--r-- | socket/tcp-bsd.c | 162 |
1 files changed, 113 insertions, 49 deletions
diff --git a/socket/tcp-bsd.c b/socket/tcp-bsd.c index da5a8d6..6e54b22 100644 --- a/socket/tcp-bsd.c +++ b/socket/tcp-bsd.c @@ -62,8 +62,8 @@ typedef struct { } TcpPriv; struct to_be_sent { - guint length; - gchar *buf; + guint8 *buf; + gsize length; gboolean can_drop; }; @@ -72,13 +72,13 @@ struct to_be_sent { static void socket_close (NiceSocket *sock); static gint socket_recv_messages (NiceSocket *sock, NiceInputMessage *recv_messages, guint n_recv_messages); -static gboolean socket_send (NiceSocket *sock, const NiceAddress *to, - guint len, const gchar *buf); +static gint socket_send_messages (NiceSocket *sock, + const NiceOutputMessage *messages, guint n_messages); static gboolean socket_is_reliable (NiceSocket *sock); -static void add_to_be_sent (NiceSocket *sock, const gchar *buf, guint len, - gboolean head); +static void add_to_be_sent (NiceSocket *sock, const NiceOutputMessage *message, + gsize message_offset, gboolean head); static void free_to_be_sent (struct to_be_sent *tbs); static gboolean socket_send_more (GSocket *gsocket, GIOCondition condition, gpointer data); @@ -169,7 +169,7 @@ nice_tcp_bsd_socket_new (GMainContext *ctx, NiceAddress *addr) priv->error = FALSE; sock->fileno = gsock; - sock->send = socket_send; + sock->send_messages = socket_send_messages; sock->recv_messages = socket_recv_messages; sock->is_reliable = socket_is_reliable; sock->close = socket_close; @@ -252,59 +252,88 @@ socket_recv_messages (NiceSocket *sock, return i; } -/* Data sent to this function must be a single entity because buffers can be - * dropped if the bandwidth isn't fast enough. So do not send a message in - * multiple chunks. */ -static gboolean -socket_send (NiceSocket *sock, const NiceAddress *to, - guint len, const gchar *buf) +static gssize +socket_send_message (NiceSocket *sock, const NiceOutputMessage *message) { TcpPriv *priv = sock->priv; - int ret; + gssize ret; GError *gerr = NULL; /* Don't try to access the socket if it had an error, otherwise we risk a - crash with SIGPIPE (Broken pipe) */ + * crash with SIGPIPE (Broken pipe) */ if (priv->error) return -1; /* First try to send the data, don't send it later if it can be sent now - this way we avoid allocating memory on every send */ + * this way we avoid allocating memory on every send */ if (g_queue_is_empty (&priv->send_queue)) { - ret = g_socket_send (sock->fileno, buf, len, NULL, &gerr); + ret = g_socket_send_message (sock->fileno, NULL, message->buffers, + message->n_buffers, NULL, 0, G_SOCKET_MSG_NONE, NULL, &gerr); + if (ret < 0) { - if(g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) - || g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_FAILED)) { - add_to_be_sent (sock, buf, len, FALSE); - g_error_free (gerr); - return TRUE; - } else { - g_error_free (gerr); - return FALSE; + if (g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) || + g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_FAILED)) { + /* Queue the message and send it later. */ + add_to_be_sent (sock, message, 0, FALSE); + ret = message->length; } - } else if ((guint)ret < len) { - add_to_be_sent (sock, buf + ret, len - ret, TRUE); - return TRUE; + + g_error_free (gerr); + } else if ((gsize) ret < message->length) { + /* Partial send. */ + add_to_be_sent (sock, message, ret, TRUE); + ret = message->length; } } else { - if (g_queue_get_length(&priv->send_queue) >= MAX_QUEUE_LENGTH) { - int peek_idx = 0; + /* If the queue is too long, drop whatever packets we can. */ + if (g_queue_get_length (&priv->send_queue) >= MAX_QUEUE_LENGTH) { + guint peek_idx = 0; struct to_be_sent *tbs = NULL; + while ((tbs = g_queue_peek_nth (&priv->send_queue, peek_idx)) != NULL) { if (tbs->can_drop) { tbs = g_queue_pop_nth (&priv->send_queue, peek_idx); - g_free (tbs->buf); - g_slice_free (struct to_be_sent, tbs); + free_to_be_sent (tbs); break; } else { peek_idx++; } } } - add_to_be_sent (sock, buf, len, FALSE); + + /* Queue the message and send it later. */ + add_to_be_sent (sock, message, 0, FALSE); + ret = message->length; } - return TRUE; + return ret; +} + +/* Data sent to this function must be a single entity because buffers can be + * dropped if the bandwidth isn't fast enough. So do not send a message in + * multiple chunks. */ +static gint +socket_send_messages (NiceSocket *sock, const NiceOutputMessage *messages, + guint n_messages) +{ + guint i; + + for (i = 0; i < n_messages; i++) { + const NiceOutputMessage *message = &messages[i]; + gssize len; + + len = socket_send_message (sock, message); + + if (len < 0) { + /* Error. */ + return len; + } else if (len == 0) { + /* EWOULDBLOCK. */ + break; + } + } + + return i; } static gboolean @@ -348,28 +377,37 @@ socket_send_more ( /* connection hangs up */ ret = -1; } else { - ret = g_socket_send (sock->fileno, tbs->buf, tbs->length, NULL, &gerr); + GOutputVector local_bufs = { tbs->buf, tbs->length }; + ret = g_socket_send_message (sock->fileno, NULL, &local_bufs, 1, NULL, 0, + G_SOCKET_MSG_NONE, NULL, &gerr); } if (ret < 0) { - if(gerr != NULL && + if (gerr != NULL && g_error_matches (gerr, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { - add_to_be_sent (sock, tbs->buf, tbs->length, TRUE); - g_free (tbs->buf); - g_slice_free (struct to_be_sent, tbs); + GOutputVector local_buf = { tbs->buf, tbs->length }; + NiceOutputMessage local_message = { + &local_buf, 1, NULL, local_buf.size + }; + + add_to_be_sent (sock, &local_message, 0, TRUE); + free_to_be_sent (tbs); g_error_free (gerr); break; } g_error_free (gerr); } else if (ret < (int) tbs->length) { - add_to_be_sent (sock, tbs->buf + ret, tbs->length - ret, TRUE); - g_free (tbs->buf); - g_slice_free (struct to_be_sent, tbs); + GOutputVector local_buf = { tbs->buf + ret, tbs->length - ret }; + NiceOutputMessage local_message = { + &local_buf, 1, NULL, local_buf.size + }; + + add_to_be_sent (sock, &local_message, 0, TRUE); + free_to_be_sent (tbs); break; } - g_free (tbs->buf); - g_slice_free (struct to_be_sent, tbs); + free_to_be_sent (tbs); } if (g_queue_is_empty (&priv->send_queue)) { @@ -386,19 +424,25 @@ socket_send_more ( } +/* Queue data starting at byte offset @message_offset from @message’s + * buffers. */ static void -add_to_be_sent (NiceSocket *sock, const gchar *buf, guint len, gboolean head) +add_to_be_sent (NiceSocket *sock, const NiceOutputMessage *message, + gsize message_offset, gboolean head) { TcpPriv *priv = sock->priv; - struct to_be_sent *tbs = NULL; + struct to_be_sent *tbs; + guint j; + gsize offset = 0; - if (len <= 0) + if (message_offset >= message->length) return; tbs = g_slice_new0 (struct to_be_sent); - tbs->buf = g_memdup (buf, len); - tbs->length = len; + tbs->buf = g_malloc (message->length - message_offset); + tbs->length = message->length - message_offset; tbs->can_drop = !head; + if (head) g_queue_push_head (&priv->send_queue, tbs); else @@ -410,6 +454,26 @@ add_to_be_sent (NiceSocket *sock, const gchar *buf, guint len, gboolean head) sock, NULL); g_source_attach (priv->io_source, priv->context); } + + /* Move the data into the buffer. */ + for (j = 0; + (message->n_buffers >= 0 && j < (guint) message->n_buffers) || + (message->n_buffers < 0 && message->buffers[j].buffer != NULL); + j++) { + const GOutputVector *buffer = &message->buffers[j]; + gsize len; + + /* Skip this buffer if it’s within @message_offset. */ + if (buffer->size <= message_offset) { + message_offset -= buffer->size; + continue; + } + + len = MIN (tbs->length - offset, buffer->size - message_offset); + memcpy (tbs->buf + offset, (guint8 *) buffer->buffer + message_offset, len); + offset += len; + message_offset -= len; + } } |