summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSanchayan Maity <sanchayan@asymptotic.io>2020-09-07 18:35:40 +0530
committerSanchayan Maity <sanchayan@asymptotic.io>2020-09-21 10:06:48 +0530
commit410db7d21651877dc15936f5449784e07b55fb01 (patch)
treec6dba1b3550a41a2c583bc27e17d710a1a3d70f7
parentfaf027050ed4db98286290953acb792f8991002f (diff)
downloadpulseaudio-410db7d21651877dc15936f5449784e07b55fb01.tar.gz
rtp: Fix sending of small packets
The current implementation for RTP send isn't optimised for sending MTU bytes of data like rtp-native. For eg. if MTU is 1280 bytes and we have to send 1276 bytes, two packets are send out one of 1268 bytes and other of 8 bytes. Sending out a packet of 8 bytes has a significant overhead and we should be sending MTU bytes of data. Fix this by accumulating MTU bytes of data and sending data only on accumulation of MTU worth of data.
-rw-r--r--src/modules/rtp/rtp-gstreamer.c107
1 files changed, 74 insertions, 33 deletions
diff --git a/src/modules/rtp/rtp-gstreamer.c b/src/modules/rtp/rtp-gstreamer.c
index 0db330958..2fe0b02e1 100644
--- a/src/modules/rtp/rtp-gstreamer.c
+++ b/src/modules/rtp/rtp-gstreamer.c
@@ -43,6 +43,7 @@
}
#define MAKE_ELEMENT(v, e) MAKE_ELEMENT_NAMED((v), (e), NULL)
+#define RTP_HEADER_SIZE 12
struct pa_rtp_context {
pa_fdsem *fdsem;
@@ -53,6 +54,9 @@ struct pa_rtp_context {
GstElement *appsink;
uint32_t last_timestamp;
+
+ uint8_t *send_buf;
+ size_t mtu;
};
static GstCaps* caps_from_sample_spec(const pa_sample_spec *ss) {
@@ -171,6 +175,8 @@ pa_rtp_context* pa_rtp_context_new_send(int fd, uint8_t payload, size_t mtu, con
c = pa_xnew0(pa_rtp_context, 1);
c->ss = *ss;
+ c->mtu = mtu - RTP_HEADER_SIZE;
+ c->send_buf = pa_xmalloc(c->mtu);
if (!gst_init_check(NULL, NULL, &error)) {
pa_log_error("Could not initialise GStreamer: %s", error->message);
@@ -216,18 +222,10 @@ static bool process_bus_messages(pa_rtp_context *c) {
return ret;
}
-static void free_buffer(pa_memblock *memblock) {
- pa_memblock_release(memblock);
- pa_memblock_unref(memblock);
-}
-
/* Called from I/O thread context */
int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
- pa_memchunk chunk = { 0, };
GstBuffer *buf;
- void *data;
- bool stop = false;
- int ret = 0;
+ size_t n = 0;
pa_assert(c);
pa_assert(q);
@@ -235,40 +233,81 @@ int pa_rtp_send(pa_rtp_context *c, pa_memblockq *q) {
if (!process_bus_messages(c))
return -1;
- while (!stop && pa_memblockq_peek(q, &chunk) == 0) {
- GstClock *clock;
- GstClockTime timestamp, clock_time;
+ /*
+ * While we check here for atleast MTU worth of data being available in
+ * memblockq, we might not have exact equivalent to MTU. Hence, we walk
+ * over the memchunks in memblockq and accumulate MTU bytes next.
+ */
+ if (pa_memblockq_get_length(q) < c->mtu)
+ return 0;
+
+ for (;;) {
+ pa_memchunk chunk;
+ int r;
+
+ pa_memchunk_reset(&chunk);
+
+ if ((r = pa_memblockq_peek(q, &chunk)) >= 0) {
+ /*
+ * Accumulate MTU bytes of data before sending. If the current
+ * chunk length + accumulated bytes exceeds MTU, we drop bytes
+ * considered for transfer in this iteration from memblockq.
+ *
+ * The remaining bytes will be available in the next iteration,
+ * as these will be tracked and maintained by memblockq.
+ */
+ size_t k = n + chunk.length > c->mtu ? c->mtu - n : chunk.length;
+
+ pa_assert(chunk.memblock);
+
+ memcpy(c->send_buf + n, pa_memblock_acquire_chunk(&chunk), k);
+ pa_memblock_release(chunk.memblock);
+ pa_memblock_unref(chunk.memblock);
+
+ n += k;
+ pa_memblockq_drop(q, k);
+ }
- clock = gst_element_get_clock(c->pipeline);
- clock_time = gst_clock_get_time(clock);
- gst_object_unref(clock);
+ if (r < 0 || n >= c->mtu) {
+ GstClock *clock;
+ GstClockTime timestamp, clock_time;
+ GstMapInfo info;
- timestamp = gst_element_get_base_time(c->pipeline);
- if (timestamp > clock_time)
- timestamp -= clock_time;
- else
- timestamp = 0;
+ if (n > 0) {
+ clock = gst_element_get_clock(c->pipeline);
+ clock_time = gst_clock_get_time(clock);
+ gst_object_unref(clock);
- pa_assert(chunk.memblock);
+ timestamp = gst_element_get_base_time(c->pipeline);
+ if (timestamp > clock_time)
+ timestamp -= clock_time;
+ else
+ timestamp = 0;
- data = pa_memblock_acquire(chunk.memblock);
+ buf = gst_buffer_new_allocate(NULL, n, NULL);
+ pa_assert(buf);
- buf = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_READONLY | GST_MEMORY_FLAG_PHYSICALLY_CONTIGUOUS,
- data, chunk.length, chunk.index, chunk.length, chunk.memblock,
- (GDestroyNotify) free_buffer);
+ GST_BUFFER_PTS(buf) = timestamp;
- GST_BUFFER_PTS(buf) = timestamp;
+ pa_assert_se(gst_buffer_map(buf, &info, GST_MAP_WRITE));
- if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) {
- pa_log_error("Could not push buffer");
- stop = true;
- ret = -1;
- }
+ memcpy(info.data, c->send_buf, n);
+ gst_buffer_unmap(buf, &info);
- pa_memblockq_drop(q, chunk.length);
+ if (gst_app_src_push_buffer(GST_APP_SRC(c->appsrc), buf) != GST_FLOW_OK) {
+ pa_log_error("Could not push buffer");
+ return -1;
+ }
+ }
+
+ if (r < 0 || pa_memblockq_get_length(q) < c->mtu)
+ break;
+
+ n = 0;
+ }
}
- return ret;
+ return 0;
}
static GstCaps* rtp_caps_from_sample_spec(const pa_sample_spec *ss) {
@@ -415,6 +454,7 @@ pa_rtp_context* pa_rtp_context_new_recv(int fd, uint8_t payload, const pa_sample
c->fdsem = pa_fdsem_new();
c->ss = *ss;
+ c->send_buf = NULL;
if (!gst_init_check(NULL, NULL, &error)) {
pa_log_error("Could not initialise GStreamer: %s", error->message);
@@ -537,6 +577,7 @@ void pa_rtp_context_free(pa_rtp_context *c) {
if (c->appsrc) {
gst_app_src_end_of_stream(GST_APP_SRC(c->appsrc));
gst_object_unref(c->appsrc);
+ pa_xfree(c->send_buf);
}
if (c->appsink)