diff options
author | Philip Withnall <philip.withnall@collabora.co.uk> | 2014-01-21 15:56:18 +0000 |
---|---|---|
committer | Olivier Crête <olivier.crete@collabora.com> | 2014-01-31 01:49:07 -0500 |
commit | f7b15f1444f119ad5c3c835a27faca59e74059d2 (patch) | |
tree | cf1227e6fb452bb9d11060cfe5a0ef120a481338 /tests | |
parent | 515481e6f45da24689d27c8eff60f4b5fa849c49 (diff) | |
download | libnice-f7b15f1444f119ad5c3c835a27faca59e74059d2.tar.gz |
agent: Add support for vectored I/O for sends
Add one new public function, nice_agent_send_messages_nonblocking(),
which replaces nice_agent_send_full(). This isn’t an API break, because
nice_agent_send_full() hasn’t been in a release yet. The new API allows
sending multiple messages in a single call, and supports vectors of
buffers to transmit the messages from.
The existing nice_agent_send() API has been left untouched, although
it’s a bit of a bugbear because it’s non-blocking and doesn’t fit with
the new *_nonblocking() naming scheme. Oh well.
This doesn’t bring any notable changes to the number of memcpy()s on the
critical path: it remains at zero for the common cases and common socket
types. It introduces the possibility for future work to eliminate some
memcpy()s in more complex socket types, like tcp-turn and tcp-bsd, but
these optimisations have not been made yet. FIXME comments have been
added.
This includes modifications to the test-send-recv unit test to cover the
new API.
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test-send-recv.c | 246 |
1 files changed, 193 insertions, 53 deletions
diff --git a/tests/test-send-recv.c b/tests/test-send-recv.c index 1dd1de4..4650ba1 100644 --- a/tests/test-send-recv.c +++ b/tests/test-send-recv.c @@ -64,6 +64,9 @@ #include <unistd.h> #endif +/* Maximum IP payload ((1 << 16) - 1), minus IP header, minus UDP header. */ +#define MAX_MESSAGE_SIZE (65535 - 20 - 8) /* bytes */ + typedef enum { STREAM_AGENT, /* nice_agent_[send|recv]() */ STREAM_AGENT_NONBLOCKING, /* nice_agent_[send|recv]_nonblocking() */ @@ -74,7 +77,7 @@ typedef enum { typedef enum { BUFFER_SIZE_CONSTANT_LARGE, /* always 65535 bytes */ - BUFFER_SIZE_CONSTANT_SMALL, /* always 1024 bytes */ + BUFFER_SIZE_CONSTANT_SMALL, /* always 4096 bytes */ BUFFER_SIZE_CONSTANT_TINY, /* always 1 byte */ BUFFER_SIZE_ASCENDING, /* ascending powers of 2 */ BUFFER_SIZE_RANDOM, /* random every time */ @@ -118,6 +121,7 @@ typedef struct { } receive; BufferDataStrategy buffer_data_strategy; gsize n_bytes; + guint n_messages; /* Test state. */ GRand *transmit_size_rand; @@ -125,6 +129,9 @@ typedef struct { gsize transmitted_bytes; gsize received_bytes; gsize *other_received_bytes; + guint transmitted_messages; + guint received_messages; + guint *other_received_messages; } TestData; /* Whether @stream_api is blocking (vs. non-blocking). */ @@ -327,13 +334,18 @@ generate_buffer_to_receive (TestIOStreamThreadData *data, gsize buffer_offset, * * @max_buffer_size may be used to limit the total size of all the buffers in * all the messages, for example to avoid blocking on receiving data which will - * never be sent. + * never be sent. This only applies for blocking, reliable stream APIs. + * + * @max_n_messages may be used to limit the number of messages generated, to + * avoid blocking on receiving messages which will never be sent. This only + * applies for blocking, non-reliable stream APIs. * * @messages must be freed with g_free(), as must all of the buffer arrays and * the buffers themselves. */ static void generate_messages_to_receive (TestIOStreamThreadData *data, gsize buffer_offset, - NiceInputMessage **messages, guint *n_messages, gsize max_buffer_size) + NiceInputMessage **messages, guint *n_messages, gsize max_buffer_size, + guint max_n_messages) { TestData *test_data = data->user_data; guint i; @@ -342,6 +354,10 @@ generate_messages_to_receive (TestIOStreamThreadData *data, gsize buffer_offset, *n_messages = generate_message_count (test_data->receive.message_count_strategy, test_data->receive_size_rand, buffer_offset); + + if (!data->reliable) + *n_messages = MIN (*n_messages, max_n_messages); + *messages = g_malloc_n (*n_messages, sizeof (NiceInputMessage)); for (i = 0; i < *n_messages; i++) { @@ -364,12 +380,12 @@ generate_messages_to_receive (TestIOStreamThreadData *data, gsize buffer_offset, test_data->receive_size_rand, buffer_offset); /* Trim the buffer length if it would otherwise cause the API to block. */ - if (data->reliable) + if (data->reliable) { buf_len = MIN (buf_len, max_buffer_size); + max_buffer_size -= buf_len; + } - max_buffer_size -= buf_len; buffer->size = buf_len; - buffer->buffer = g_malloc (buffer->size); /* Fill it with poison to try and detect incorrect writes. */ @@ -377,7 +393,7 @@ generate_messages_to_receive (TestIOStreamThreadData *data, gsize buffer_offset, /* If we’ve hit the max_buffer_size, adjust the buffer and message counts * and run away. */ - if (max_buffer_size == 0) { + if (data->reliable && max_buffer_size == 0) { message->n_buffers = j + 1; *n_messages = i + 1; return; @@ -402,12 +418,17 @@ validate_received_buffer (TestIOStreamThreadData *data, gsize buffer_offset, if (stream_api_is_blocking (test_data->stream_api) && data->reliable) g_assert_cmpint (len, ==, buf_len); - /* Validate the buffer contents. */ + /* Validate the buffer contents. + * + * Note: Buffers can only be validated up to valid_len. The buffer may + * have been re-used internally (e.g. by receiving a STUN message, then + * overwriting it with a data packet), so we can’t guarantee that the + * bytes beyond valid_len have been untouched. */ expected_buf = g_malloc (buf_len); memset (expected_buf, 0xaa, buf_len); generate_buffer_data (test_data->buffer_data_strategy, buffer_offset, expected_buf, len); - g_assert (memcmp (*buf, expected_buf, buf_len) == 0); + g_assert (memcmp (*buf, expected_buf, len) == 0); g_free (expected_buf); test_data->received_bytes += len; @@ -431,6 +452,8 @@ validate_received_messages (TestIOStreamThreadData *data, gsize buffer_offset, if (stream_api_is_blocking (test_data->stream_api)) g_assert_cmpint (n_valid_messages, ==, n_messages); + test_data->received_messages += n_valid_messages; + /* Validate the message contents. */ for (i = 0; i < (guint) n_valid_messages; i++) { NiceInputMessage *message = &((*messages)[i]); @@ -445,6 +468,7 @@ validate_received_messages (TestIOStreamThreadData *data, gsize buffer_offset, guint8 *expected_buf; gsize valid_len; + /* See note above about valid_len. */ total_buf_len += buffer->size; valid_len = MIN (message_len_remaining, buffer->size); @@ -452,7 +476,7 @@ validate_received_messages (TestIOStreamThreadData *data, gsize buffer_offset, memset (expected_buf, 0xaa, buffer->size); generate_buffer_data (test_data->buffer_data_strategy, buffer_offset, expected_buf, valid_len); - g_assert (memcmp (buffer->buffer, expected_buf, buffer->size) == 0); + g_assert (memcmp (buffer->buffer, expected_buf, valid_len) == 0); g_free (expected_buf); test_data->received_bytes += valid_len; @@ -471,7 +495,7 @@ validate_received_messages (TestIOStreamThreadData *data, gsize buffer_offset, prev_message_len = message->length; /* If the API was blocking, it should have completely filled the message. */ - if (stream_api_is_blocking (test_data->stream_api)) + if (stream_api_is_blocking (test_data->stream_api) && data->reliable) g_assert_cmpuint (message->length, ==, total_buf_len); g_assert (message->from == NULL); @@ -501,6 +525,79 @@ generate_buffer_to_transmit (TestIOStreamThreadData *data, gsize buffer_offset, *buf, *buf_len); } +/* Similar to generate_buffer_to_transmit(), except that it generates an array + * of NiceOutputMessages rather than a single buffer. */ +static void +generate_messages_to_transmit (TestIOStreamThreadData *data, + gsize buffer_offset, NiceOutputMessage **messages, guint *n_messages) +{ + TestData *test_data = data->user_data; + guint i; + gsize total_buf_len = 0; + + /* Determine the number of messages to send. */ + *n_messages = + generate_message_count (test_data->transmit.message_count_strategy, + test_data->transmit_size_rand, buffer_offset); + *n_messages = + MIN (*n_messages, + test_data->n_messages - test_data->transmitted_messages); + + *messages = g_malloc_n (*n_messages, sizeof (NiceOutputMessage)); + + for (i = 0; i < *n_messages; i++) { + NiceOutputMessage *message = &((*messages)[i]); + guint j; + gsize max_message_size; + + message->n_buffers = + generate_buffer_count (test_data->transmit.buffer_count_strategy, + test_data->transmit_size_rand, buffer_offset); + message->buffers = g_malloc_n (message->n_buffers, sizeof (GOutputVector)); + message->to = NULL; + message->length = 0; + + /* Limit the overall message size to the smaller of (n_bytes / n_messages) + * and MAX_MESSAGE_SIZE, to ensure each message is non-empty. */ + max_message_size = + MIN ((test_data->n_bytes / test_data->n_messages), MAX_MESSAGE_SIZE); + + for (j = 0; j < (guint) message->n_buffers; j++) { + GOutputVector *buffer = &message->buffers[j]; + gsize buf_len; + guint8 *buf; + + buf_len = + generate_buffer_size (test_data->transmit.buffer_size_strategy, + test_data->transmit_size_rand, buffer_offset); + buf_len = + MIN (buf_len, + test_data->n_bytes - test_data->transmitted_bytes - total_buf_len); + buf_len = MIN (buf_len, max_message_size - message->length); + + buffer->size = buf_len; + buf = g_malloc (buffer->size); + buffer->buffer = buf; + message->length += buf_len; + total_buf_len += buf_len; + + /* Fill it with data. */ + generate_buffer_data (test_data->buffer_data_strategy, buffer_offset, + buf, buf_len); + + buffer_offset += buf_len; + + /* Reached the maximum UDP payload size? */ + if (message->length >= max_message_size) { + message->n_buffers = j + 1; + break; + } + } + + g_assert_cmpuint (message->length, <=, max_message_size); + } +} + /* Validate the number of bytes transmitted, and update the test’s internal * state machine. Consumes @buf. */ static void @@ -517,6 +614,39 @@ notify_transmitted_buffer (TestIOStreamThreadData *data, gsize buffer_offset, g_free (*buf); } +/* Similar to notify_transmitted_buffer(), except it operates on an array of + * messages from generate_messages_to_transmit(). */ +static void +notify_transmitted_messages (TestIOStreamThreadData *data, gsize buffer_offset, + NiceOutputMessage **messages, guint n_messages, gint n_sent_messages) +{ + TestData *test_data = data->user_data; + guint i; + + g_assert_cmpint (n_sent_messages, <=, n_messages); + g_assert_cmpint (n_sent_messages, >=, 0); + + test_data->transmitted_messages += n_sent_messages; + + for (i = 0; i < n_messages; i++) { + NiceOutputMessage *message = &((*messages)[i]); + guint j; + + if (i < (guint) n_sent_messages) + test_data->transmitted_bytes += message->length; + + for (j = 0; j < (guint) message->n_buffers; j++) { + GOutputVector *buffer = &message->buffers[j]; + + g_free ((guint8 *) buffer->buffer); + } + + g_free (message->buffers); + } + + g_free (*messages); +} + /* * Implementation using nice_agent_recv_messages() and nice_agent_send(). */ @@ -539,7 +669,8 @@ read_thread_agent_cb (GInputStream *input_stream, TestIOStreamThreadData *data) /* Initialise an array of messages to receive into. */ generate_messages_to_receive (data, test_data->received_bytes, &messages, - &n_messages, test_data->n_bytes - test_data->received_bytes); + &n_messages, test_data->n_bytes - test_data->received_bytes, + test_data->n_messages - test_data->received_messages); /* Block on receiving some data. */ n_valid_messages = nice_agent_recv_messages (data->agent, stream_id, @@ -570,37 +701,26 @@ write_thread_agent_cb (GOutputStream *output_stream, while (test_data->transmitted_bytes < test_data->n_bytes) { GError *error = NULL; - guint8 *buf = NULL; - gsize buf_len = 0; - gssize _len; - gssize len = 0; + NiceOutputMessage *messages; + guint n_messages; + gint n_sent_messages; /* Generate a buffer to transmit. */ - generate_buffer_to_transmit (data, test_data->transmitted_bytes, &buf, - &buf_len); + generate_messages_to_transmit (data, test_data->transmitted_bytes, + &messages, &n_messages); - /* Transmit it. */ + /* Busy loop on receiving some data. */ do { - _len = nice_agent_send_full (data->agent, stream_id, component_id, - buf + len, buf_len - len, NULL, &error); - - /* Busy loop on EWOULDBLOCK. */ - if (_len == -1 && - g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { - g_clear_error (&error); - continue; - } else if (_len > 0) { - len += _len; - } else { - len = _len; - } - - g_assert_no_error (error); - } while (len != -1 && (gsize) len < buf_len); + g_clear_error (&error); + n_sent_messages = nice_agent_send_messages_nonblocking (data->agent, + stream_id, component_id, messages, n_messages, NULL, &error); + } while (n_sent_messages == -1 && + g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)); + g_assert_no_error (error); /* Update the test’s buffer generation state machine. */ - notify_transmitted_buffer (data, test_data->transmitted_bytes, &buf, - buf_len, len); + notify_transmitted_messages (data, test_data->transmitted_bytes, &messages, + n_messages, n_sent_messages); } } @@ -628,7 +748,12 @@ read_thread_agent_nonblocking_cb (GInputStream *input_stream, /* Initialise an array of messages to receive into. */ generate_messages_to_receive (data, test_data->received_bytes, &messages, - &n_messages, test_data->n_bytes - test_data->received_bytes); + &n_messages, test_data->n_bytes - test_data->received_bytes, + test_data->n_messages - test_data->received_messages); + + /* Trim n_messages to avoid consuming the ‘done’ message. */ + n_messages = + MIN (n_messages, test_data->n_messages - test_data->received_messages); /* Busy loop on receiving some data. */ do { @@ -889,18 +1014,21 @@ write_thread_gsource_cb (GOutputStream *output_stream, static void test_data_init (TestData *data, gboolean reliable, StreamApi stream_api, - gsize n_bytes, BufferSizeStrategy transmit_buffer_size_strategy, + gsize n_bytes, guint n_messages, + BufferSizeStrategy transmit_buffer_size_strategy, BufferCountStrategy transmit_buffer_count_strategy, MessageCountStrategy transmit_message_count_strategy, BufferSizeStrategy receive_buffer_size_strategy, BufferCountStrategy receive_buffer_count_strategy, MessageCountStrategy receive_message_count_strategy, BufferDataStrategy buffer_data_strategy, guint32 transmit_seed, - guint32 receive_seed, gsize *other_received_bytes) + guint32 receive_seed, gsize *other_received_bytes, + guint *other_received_messages) { data->reliable = reliable; data->stream_api = stream_api; data->n_bytes = n_bytes; + data->n_messages = n_messages; data->transmit.buffer_size_strategy = transmit_buffer_size_strategy; data->transmit.buffer_count_strategy = transmit_buffer_count_strategy; data->transmit.message_count_strategy = transmit_message_count_strategy; @@ -913,6 +1041,9 @@ test_data_init (TestData *data, gboolean reliable, StreamApi stream_api, data->transmitted_bytes = 0; data->received_bytes = 0; data->other_received_bytes = other_received_bytes; + data->transmitted_messages = 0; + data->received_messages = 0; + data->other_received_messages = other_received_messages; } /* @@ -926,7 +1057,7 @@ test_data_clear (TestData *data) } static void -test (gboolean reliable, StreamApi stream_api, gsize n_bytes, +test (gboolean reliable, StreamApi stream_api, gsize n_bytes, guint n_messages, BufferSizeStrategy transmit_buffer_size_strategy, BufferCountStrategy transmit_buffer_count_strategy, MessageCountStrategy transmit_message_count_strategy, @@ -950,18 +1081,18 @@ test (gboolean reliable, StreamApi stream_api, gsize n_bytes, NULL, NULL }, /* STREAM_GSOURCE */ }; - test_data_init (&l_data, reliable, stream_api, n_bytes, + test_data_init (&l_data, reliable, stream_api, n_bytes, n_messages, transmit_buffer_size_strategy, transmit_buffer_count_strategy, transmit_message_count_strategy, receive_buffer_size_strategy, receive_buffer_count_strategy, receive_message_count_strategy, buffer_data_strategy, transmit_seed, receive_seed, - &r_data.received_bytes); - test_data_init (&r_data, reliable, stream_api, n_bytes, + &r_data.received_bytes, &r_data.received_messages); + test_data_init (&r_data, reliable, stream_api, n_bytes, n_messages, transmit_buffer_size_strategy, transmit_buffer_count_strategy, transmit_message_count_strategy, receive_buffer_size_strategy, receive_buffer_count_strategy, receive_message_count_strategy, buffer_data_strategy, transmit_seed, receive_seed, - &l_data.received_bytes); + &l_data.received_bytes, &l_data.received_messages); run_io_stream_test (deadlock_timeout, reliable, &callbacks[stream_api], &l_data, NULL, &r_data, NULL); @@ -973,7 +1104,8 @@ test (gboolean reliable, StreamApi stream_api, gsize n_bytes, /* Options with default values. */ guint32 option_transmit_seed = 0; guint32 option_receive_seed = 0; -gsize option_n_bytes = 100000; +gsize option_n_bytes = 10000; +guint option_n_messages = 50; guint option_timeout = 1200; /* seconds */ gboolean option_long_mode = FALSE; @@ -983,7 +1115,9 @@ static GOptionEntry entries[] = { { "receive-seed", 0, 0, G_OPTION_ARG_INT, &option_receive_seed, "Seed for reception RNG", "S" }, { "n-bytes", 'n', 0, G_OPTION_ARG_INT64, &option_n_bytes, - "Number of bytes to send in each test (default 100000)", "N" }, + "Number of bytes to send in each test (default 10000)", "N" }, + { "n-messages", 'm', 0, G_OPTION_ARG_INT64, &option_n_messages, + "Number of messages to send in each test (default 50)", "M" }, { "timeout", 't', 0, G_OPTION_ARG_INT, &option_timeout, "Deadlock detection timeout length, in seconds (default: 1200)", "S" }, { "long-mode", 'l', 0, G_OPTION_ARG_NONE, &option_long_mode, @@ -1006,6 +1140,7 @@ main (int argc, char *argv[]) guint32 transmit_seed; guint32 receive_seed; gsize n_bytes; + guint n_messages; guint deadlock_timeout; gboolean long_mode; GOptionContext *context; @@ -1026,6 +1161,7 @@ main (int argc, char *argv[]) transmit_seed = option_transmit_seed; receive_seed = option_receive_seed; n_bytes = option_n_bytes; + n_messages = option_n_messages; deadlock_timeout = option_timeout; long_mode = option_long_mode; @@ -1073,12 +1209,14 @@ main (int argc, char *argv[]) receive_message_count_strategy = MESSAGE_COUNT_CONSTANT_ONE; } - g_debug ("Running test (%u, %u, %" G_GSIZE_FORMAT ", %u, " + g_debug ("Running test (%u, %u, %" G_GSIZE_FORMAT ", %u, %u, " "%u, %u, %u, %u)…", - reliable, stream_api, n_bytes, transmit_buffer_size_strategy, + reliable, stream_api, n_bytes, n_messages, + transmit_buffer_size_strategy, receive_buffer_size_strategy, buffer_data_strategy, transmit_seed, receive_seed); - test (reliable, stream_api, n_bytes, transmit_buffer_size_strategy, + test (reliable, stream_api, n_bytes, n_messages, + transmit_buffer_size_strategy, transmit_buffer_count_strategy, transmit_message_count_strategy, receive_buffer_size_strategy, receive_buffer_count_strategy, receive_message_count_strategy, buffer_data_strategy, @@ -1129,14 +1267,16 @@ main (int argc, char *argv[]) receive_message_count_strategy != MESSAGE_COUNT_CONSTANT_ONE)) continue; - g_debug ("Running test (%u, %u, %" G_GSIZE_FORMAT ", %u, " + g_debug ("Running test (%u, %u, %" G_GSIZE_FORMAT ", %u, %u, " "%u, %u, %u, %u, %u, %u, %u, %u)…", - reliable, stream_api, n_bytes, transmit_buffer_size_strategy, + reliable, stream_api, n_bytes, n_messages, + transmit_buffer_size_strategy, transmit_buffer_count_strategy, transmit_message_count_strategy, receive_buffer_size_strategy, receive_buffer_count_strategy, receive_message_count_strategy, buffer_data_strategy, transmit_seed, receive_seed); - test (reliable, stream_api, n_bytes, transmit_buffer_size_strategy, + test (reliable, stream_api, n_bytes, n_messages, + transmit_buffer_size_strategy, transmit_buffer_count_strategy, transmit_message_count_strategy, receive_buffer_size_strategy, receive_buffer_count_strategy, receive_message_count_strategy, buffer_data_strategy, |