diff options
author | Sanchayan Maity <sanchayan@asymptotic.io> | 2020-09-07 18:35:40 +0530 |
---|---|---|
committer | Sanchayan Maity <sanchayan@asymptotic.io> | 2020-09-21 10:06:48 +0530 |
commit | 410db7d21651877dc15936f5449784e07b55fb01 (patch) | |
tree | c6dba1b3550a41a2c583bc27e17d710a1a3d70f7 | |
parent | faf027050ed4db98286290953acb792f8991002f (diff) | |
download | pulseaudio-410db7d21651877dc15936f5449784e07b55fb01.tar.gz |
rtp: Fix sending of small packets
The current implementation for RTP send isn't optimised for sending MTU
bytes of data like rtp-native. For eg. if MTU is 1280 bytes and we have
to send 1276 bytes, two packets are send out one of 1268 bytes and other
of 8 bytes. Sending out a packet of 8 bytes has a significant overhead
and we should be sending MTU bytes of data.
Fix this by accumulating MTU bytes of data and sending data only on
accumulation of MTU worth of data.
-rw-r--r-- | src/modules/rtp/rtp-gstreamer.c | 107 |
1 files changed, 74 insertions, 33 deletions
diff --git a/src/modules/rtp/rtp-gstreamer.c b/src/modules/rtp/rtp-gstreamer.c index 0db330958..2fe0b02e1 100644 --- a/src/modules/rtp/rtp-gstreamer.c +++ b/src/modules/rtp/rtp-gstreamer.c @@ -43,6 +43,7 @@ } #define MAKE_ELEMENT(v, e) MAKE_ELEMENT_NAMED((v), (e), NULL) +#define RTP_HEADER_SIZE 12 struct pa_rtp_context { pa_fdsem *fdsem; @@ -53,6 +54,9 @@ struct pa_rtp_context { GstElement *appsink; uint32_t last_timestamp; + + uint8_t *send_buf; + size_t mtu; }; static GstCaps* caps_from_sample_spec(const pa_sample_spec *ss) { @@ -171,6 +175,8 @@ pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, con c = pa_xnew0(pa_rtp_context, 1); c->ss = *ss; + c->mtu = mtu - RTP_HEADER_SIZE; + c->send_buf = pa_xmalloc(c->mtu); if (!gst_init_check(NULL, NULL, &error)) { pa_log_error("Could not initialise GStreamer: %s", error->message); @@ -216,18 +222,10 @@ static bool process_bus_messages(pa_rtp_context *c) { return ret; } -static void free_buffer(pa_memblock *memblock) { - pa_memblock_release(memblock); - pa_memblock_unref(memblock); -} - /* Called from I/O thread context */ int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) { - pa_memchunk chunk = { 0, }; GstBuffer *buf; - void *data; - bool stop = false; - int ret = 0; + size_t n = 0; pa_assert(c); pa_assert(q); @@ -235,40 +233,81 @@ int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) { if (!process_bus_messages(c)) return -1; - while (!stop && pa_memblockq_peek(q, &chunk) == 0) { - GstClock *clock; - GstClockTime timestamp, clock_time; + /* + * While we check here for atleast MTU worth of data being available in + * memblockq, we might not have exact equivalent to MTU. Hence, we walk + * over the memchunks in memblockq and accumulate MTU bytes next. + */ + if (pa_memblockq_get_length(q) < c->mtu) + return 0; + + for (;;) { + pa_memchunk chunk; + int r; + + pa_memchunk_reset(&chunk); + + if ((r = pa_memblockq_peek(q, &chunk)) >= 0) { + /* + * Accumulate MTU bytes of data before sending. If the current + * chunk length + accumulated bytes exceeds MTU, we drop bytes + * considered for transfer in this iteration from memblockq. + * + * The remaining bytes will be available in the next iteration, + * as these will be tracked and maintained by memblockq. + */ + size_t k = n + chunk.length > c->mtu ? c->mtu - n : chunk.length; + + pa_assert(chunk.memblock); + + memcpy(c->send_buf + n, pa_memblock_acquire_chunk(&chunk), k); + pa_memblock_release(chunk.memblock); + pa_memblock_unref(chunk.memblock); + + n += k; + pa_memblockq_drop(q, k); + } - clock = gst_element_get_clock(c->pipeline); - clock_time = gst_clock_get_time(clock); - gst_object_unref(clock); + if (r < 0 || n >= c->mtu) { + GstClock *clock; + GstClockTime timestamp, clock_time; + GstMapInfo info; - timestamp = gst_element_get_base_time(c->pipeline); - if (timestamp > clock_time) - timestamp -= clock_time; - else - timestamp = 0; + if (n > 0) { + clock = gst_element_get_clock(c->pipeline); + clock_time = gst_clock_get_time(clock); + gst_object_unref(clock); - pa_assert(chunk.memblock); + timestamp = gst_element_get_base_time(c->pipeline); + if (timestamp > clock_time) + timestamp -= clock_time; + else + timestamp = 0; - data = pa_memblock_acquire(chunk.memblock); + buf = gst_buffer_new_allocate(NULL, n, NULL); + pa_assert(buf); - buf = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_READONLY | GST_MEMORY_FLAG_PHYSICALLY_CONTIGUOUS, - data, chunk.length, chunk.index, chunk.length, chunk.memblock, - (GDestroyNotify) free_buffer); + GST_BUFFER_PTS(buf) = timestamp; - GST_BUFFER_PTS(buf) = timestamp; + pa_assert_se(gst_buffer_map(buf, &info, GST_MAP_WRITE)); - if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) { - pa_log_error("Could not push buffer"); - stop = true; - ret = -1; - } + memcpy(info.data, c->send_buf, n); + gst_buffer_unmap(buf, &info); - pa_memblockq_drop(q, chunk.length); + if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) { + pa_log_error("Could not push buffer"); + return -1; + } + } + + if (r < 0 || pa_memblockq_get_length(q) < c->mtu) + break; + + n = 0; + } } - return ret; + return 0; } static GstCaps* rtp_caps_from_sample_spec(const pa_sample_spec *ss) { @@ -415,6 +454,7 @@ pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample c->fdsem = pa_fdsem_new(); c->ss = *ss; + c->send_buf = NULL; if (!gst_init_check(NULL, NULL, &error)) { pa_log_error("Could not initialise GStreamer: %s", error->message); @@ -537,6 +577,7 @@ void pa_rtp_context_free(pa_rtp_context *c) { if (c->appsrc) { gst_app_src_end_of_stream(GST_APP_SRC(c->appsrc)); gst_object_unref(c->appsrc); + pa_xfree(c->send_buf); } if (c->appsink) |