summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEdward Hervey <edward@centricular.com>2020-10-06 11:45:36 +0200
committerTim-Philipp Müller <tim@centricular.com>2020-10-11 18:22:28 +0000
commitc1dcef030a3885d3b420125a78d337dccdd97d85 (patch)
treefc9a44079756a24c3d0c01b4a8b34c0e044cce97
parent2169a16a070e92dc5a0e6a476ba12562f2541700 (diff)
downloadgstreamer-plugins-bad-c1dcef030a3885d3b420125a78d337dccdd97d85.tar.gz
srtsrc: Fix timestamping
SRT provides the original timestamp of a packet (with drift/skew corrected for local clock), which is what should be used for timestamping the outgoing buffers. This ensures that we output the packets with the same timestamp (and by extension rate) as the original feed. Also detect if packets were dropped (by checking the sequence number) and properly set DISCONT flag on the outgoing buffer. Finally answer the latency queries Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-bad/-/merge_requests/1677>
-rw-r--r--ext/srt/gstsrtobject.c6
-rw-r--r--ext/srt/gstsrtobject.h3
-rw-r--r--ext/srt/gstsrtsrc.c74
-rw-r--r--ext/srt/gstsrtsrc.h2
4 files changed, 80 insertions, 5 deletions
diff --git a/ext/srt/gstsrtobject.c b/ext/srt/gstsrtobject.c
index 43b1fdc68..03a16f3e2 100644
--- a/ext/srt/gstsrtobject.c
+++ b/ext/srt/gstsrtobject.c
@@ -1249,7 +1249,8 @@ gst_srt_object_wait_caller (GstSRTObject * srtobject,
gssize
gst_srt_object_read (GstSRTObject * srtobject,
- guint8 * data, gsize size, GCancellable * cancellable, GError ** error)
+ guint8 * data, gsize size, GCancellable * cancellable, GError ** error,
+ SRT_MSGCTRL * mctrl)
{
gssize len = 0;
gint poll_timeout;
@@ -1337,7 +1338,8 @@ gst_srt_object_read (GstSRTObject * srtobject,
}
- len = srt_recvmsg (rsock, (char *) (data), size);
+ srt_msgctrl_init (mctrl);
+ len = srt_recvmsg2 (rsock, (char *) (data), size, mctrl);
if (len == SRT_ERROR) {
gint srt_errno = srt_getlasterror (NULL);
diff --git a/ext/srt/gstsrtobject.h b/ext/srt/gstsrtobject.h
index 302aa89cc..09950feaf 100644
--- a/ext/srt/gstsrtobject.h
+++ b/ext/srt/gstsrtobject.h
@@ -98,7 +98,8 @@ gboolean gst_srt_object_set_uri (GstSRTObject * srtobject, const gchar *u
gssize gst_srt_object_read (GstSRTObject * srtobject,
guint8 *data, gsize size,
GCancellable *cancellable,
- GError **err);
+ GError **err,
+ SRT_MSGCTRL *mctrl);
gssize gst_srt_object_write (GstSRTObject * srtobject,
GstBufferList * headers,
diff --git a/ext/srt/gstsrtsrc.c b/ext/srt/gstsrtsrc.c
index 2b898ea79..c886e39f3 100644
--- a/ext/srt/gstsrtsrc.c
+++ b/ext/srt/gstsrtsrc.c
@@ -97,6 +97,9 @@ gst_srt_src_start (GstBaseSrc * bsrc)
g_clear_error (&error);
}
+ /* Reset expected pktseq */
+ self->next_pktseq = 0;
+
return ret;
}
@@ -118,6 +121,11 @@ gst_srt_src_fill (GstPushSrc * src, GstBuffer * outbuf)
GstMapInfo info;
GError *err = NULL;
gssize recv_len;
+ GstClock *clock;
+ GstClockTime base_time;
+ GstClockTime capture_time;
+ GstClockTime delay;
+ SRT_MSGCTRL mctrl;
if (g_cancellable_is_cancelled (self->cancellable)) {
ret = GST_FLOW_FLUSHING;
@@ -130,11 +138,30 @@ gst_srt_src_fill (GstPushSrc * src, GstBuffer * outbuf)
goto out;
}
+ /* Get clock and values */
+ clock = gst_element_get_clock (GST_ELEMENT (src));
+ base_time = gst_element_get_base_time (GST_ELEMENT (src));
+
recv_len = gst_srt_object_read (self->srtobject, info.data,
- gst_buffer_get_size (outbuf), self->cancellable, &err);
+ gst_buffer_get_size (outbuf), self->cancellable, &err, &mctrl);
+
+ /* Capture clock values ASAP */
+ capture_time = gst_clock_get_time (clock);
+#if SRT_VERSION_VALUE >= 0x10402
+ /* Use SRT clock value if available (SRT > 1.4.2) */
+ delay = (srt_time_now () - mctrl.srctime) * GST_USECOND;
+#else
+ /* Else use the unix epoch monotonic clock */
+ delay = (g_get_real_time () - mctrl.srctime) * GST_USECOND;
+#endif
+ gst_object_unref (clock);
gst_buffer_unmap (outbuf, &info);
+ GST_LOG_OBJECT (src,
+ "recv_len:%" G_GSIZE_FORMAT " pktseq:%d msgno:%d srctime:%"
+ G_GUINT64_FORMAT, recv_len, mctrl.pktseq, mctrl.msgno, mctrl.srctime);
+
if (g_cancellable_is_cancelled (self->cancellable)) {
ret = GST_FLOW_FLUSHING;
goto out;
@@ -150,6 +177,29 @@ gst_srt_src_fill (GstPushSrc * src, GstBuffer * outbuf)
goto out;
}
+ /* Detect discontinuities */
+ if (mctrl.pktseq != self->next_pktseq) {
+ GST_WARNING_OBJECT (src, "discont detected %d (expected: %d)",
+ mctrl.pktseq, self->next_pktseq);
+ GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
+ }
+ /* pktseq is a 31bit field */
+ self->next_pktseq = (mctrl.pktseq + 1) % G_MAXINT32;
+
+ /* Subtract the base_time (since the pipeline started) ... */
+ if (capture_time > base_time)
+ capture_time -= base_time;
+ else
+ capture_time = 0;
+ /* And adjust by the delay */
+ if (capture_time > delay)
+ capture_time -= delay;
+ else
+ capture_time = 0;
+ GST_BUFFER_TIMESTAMP (outbuf) = capture_time;
+
+ GST_DEBUG_OBJECT (src, "delay:%" GST_TIME_FORMAT, GST_TIME_ARGS (delay));
+
gst_buffer_resize (outbuf, 0, recv_len);
GST_LOG_OBJECT (src,
@@ -173,7 +223,8 @@ gst_srt_src_init (GstSRTSrc * self)
gst_base_src_set_format (GST_BASE_SRC (self), GST_FORMAT_TIME);
gst_base_src_set_live (GST_BASE_SRC (self), TRUE);
- gst_base_src_set_do_timestamp (GST_BASE_SRC (self), TRUE);
+ /* We do the timing ourselves */
+ gst_base_src_set_do_timestamp (GST_BASE_SRC (self), FALSE);
gst_srt_object_set_uri (self->srtobject, GST_SRT_DEFAULT_URI, NULL);
@@ -234,6 +285,24 @@ gst_srt_src_get_property (GObject * object,
}
}
+static gboolean
+gst_srt_src_query (GstBaseSrc * basesrc, GstQuery * query)
+{
+ GstSRTSrc *self = GST_SRT_SRC (basesrc);
+
+ if (GST_QUERY_TYPE (query) == GST_QUERY_LATENCY) {
+ gint latency;
+ if (!gst_structure_get_int (self->srtobject->parameters, "latency",
+ &latency))
+ latency = GST_SRT_DEFAULT_LATENCY;
+ gst_query_set_latency (query, TRUE, latency * GST_MSECOND,
+ latency * GST_MSECOND);
+ return TRUE;
+ } else {
+ return GST_BASE_SRC_CLASS (parent_class)->query (basesrc, query);
+ }
+}
+
static void
gst_srt_src_class_init (GstSRTSrcClass * klass)
{
@@ -285,6 +354,7 @@ gst_srt_src_class_init (GstSRTSrcClass * klass)
gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_srt_src_stop);
gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_srt_src_unlock);
gstbasesrc_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_srt_src_unlock_stop);
+ gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_srt_src_query);
gstpushsrc_class->fill = GST_DEBUG_FUNCPTR (gst_srt_src_fill);
}
diff --git a/ext/srt/gstsrtsrc.h b/ext/srt/gstsrtsrc.h
index af0b83362..0ca9a4b84 100644
--- a/ext/srt/gstsrtsrc.h
+++ b/ext/srt/gstsrtsrc.h
@@ -49,6 +49,8 @@ struct _GstSRTSrc {
GstSRTObject *srtobject;
GCancellable *cancellable;
+
+ guint32 next_pktseq;
};
struct _GstSRTSrcClass {