diff options
author | Edward Hervey <edward@collabora.com> | 2013-07-29 08:10:07 +0200 |
---|---|---|
committer | Edward Hervey <edward@collabora.com> | 2013-09-28 13:15:43 +0200 |
commit | 0e9ce593bf72b05b70da7ecceec0db86ceb17d08 (patch) | |
tree | bee2d159aec3fdda40b8a4c35c00a15494a113ef /gst/mpegtsdemux | |
parent | a4ee1abb1532968a56db4a79e5bdb0a31521ccad (diff) | |
download | gstreamer-plugins-bad-0e9ce593bf72b05b70da7ecceec0db86ceb17d08.tar.gz |
tsdemux: Wait for valid PCR/offset obvervations
It is quite possible that we might get PTS/DTS before the first
PCR/Offset observation.
In order to end up with valid timestamp we wait until at least one
stream was able to get a proper running-time for any PTS/DTS.
Until then, we queue up the pending buffers to push out.
Once we see a first valid timestamp, we re-evaluate the amount of
running-time elapsed (based on returned inital running-time and amount
of data/DTS queued up) for any given stream.
Taking the biggest amount of elapsed time, we set that on the packetizer
as the initial offset and recalculate all pending buffers running-time
PTS/DTS.
Note: The buffer queueing system can also be used later on for the
dvb fast start proposal (where we queue up all stream packets before
seeing PAT/PMT and then push them once we know if they belong to the
chosen program).
Diffstat (limited to 'gst/mpegtsdemux')
-rw-r--r-- | gst/mpegtsdemux/tsdemux.c | 230 |
1 files changed, 209 insertions, 21 deletions
diff --git a/gst/mpegtsdemux/tsdemux.c b/gst/mpegtsdemux/tsdemux.c index 40fe151db..ec3d67cf7 100644 --- a/gst/mpegtsdemux/tsdemux.c +++ b/gst/mpegtsdemux/tsdemux.c @@ -97,6 +97,16 @@ typedef enum * Drop all incoming buffers */ } PendingPacketState; +/* Pending buffer */ +typedef struct +{ + /* The fully reconstructed buffer */ + GstBuffer *buffer; + + /* Raw PTS/DTS (in 90kHz units) */ + guint64 pts, dts; +} PendingBuffer; + typedef struct _TSDemuxStream TSDemuxStream; struct _TSDemuxStream @@ -104,35 +114,49 @@ struct _TSDemuxStream MpegTSBaseStream stream; GstPad *pad; + /* Whether the pad was added or not */ gboolean active; + /* TRUE if we are waiting for a valid timestamp */ + gboolean pending_ts; + /* the return of the latest push */ GstFlowReturn flow_return; /* Output data */ PendingPacketState state; - /* Data to push (allocated) */ + /* Data being reconstructed (allocated) */ guint8 *data; - /* Size of data to push (if known) */ + /* Size of data being reconstructed (if known, else 0) */ guint expected_size; - /* Size of currently queued data */ + /* Amount of bytes in current ->data */ guint current_size; + /* Size of ->data */ guint allocated_size; - /* Current PTS/DTS for this stream */ + /* Current PTS/DTS for this stream (in running time) */ GstClockTime pts; GstClockTime dts; + /* Current PTS/DTS for this stream (in 90kHz unit) */ + guint64 raw_pts, raw_dts; + /* Whether this stream needs to send a newsegment */ gboolean need_newsegment; + /* The value to use when calculating the newsegment */ + GstClockTime first_dts; + GstTagList *taglist; gint continuity_counter; + + /* List of pending buffers */ + GList *pending; }; #define VIDEO_CAPS \ @@ -1059,6 +1083,10 @@ gst_ts_demux_stream_added (MpegTSBase * base, MpegTSBaseStream * bstream, stream->need_newsegment = TRUE; stream->pts = GST_CLOCK_TIME_NONE; stream->dts = GST_CLOCK_TIME_NONE; + stream->raw_pts = -1; + stream->raw_dts = -1; + stream->pending_ts = TRUE; + stream->first_dts = GST_CLOCK_TIME_NONE; stream->continuity_counter = CONTINUITY_UNSET; } stream->flow_return = GST_FLOW_OK; @@ -1121,8 +1149,6 @@ activate_pad_for_stream (GstTSDemux * tsdemux, TSDemuxStream * stream) static void gst_ts_demux_stream_flush (TSDemuxStream * stream) { - stream->pts = GST_CLOCK_TIME_NONE; - GST_DEBUG ("flushing stream %p", stream); if (stream->data) @@ -1135,6 +1161,9 @@ gst_ts_demux_stream_flush (TSDemuxStream * stream) stream->need_newsegment = TRUE; stream->pts = GST_CLOCK_TIME_NONE; stream->dts = GST_CLOCK_TIME_NONE; + stream->first_dts = GST_CLOCK_TIME_NONE; + stream->raw_pts = -1; + stream->raw_dts = -1; if (stream->flow_return == GST_FLOW_FLUSHING) { stream->flow_return = GST_FLOW_OK; } @@ -1190,6 +1219,7 @@ gst_ts_demux_record_pts (GstTSDemux * demux, TSDemuxStream * stream, { MpegTSBaseStream *bs = (MpegTSBaseStream *) stream; + stream->raw_pts = pts; if (pts == -1) { stream->pts = GST_CLOCK_TIME_NONE; return; @@ -1223,6 +1253,7 @@ gst_ts_demux_record_dts (GstTSDemux * demux, TSDemuxStream * stream, { MpegTSBaseStream *bs = (MpegTSBaseStream *) stream; + stream->raw_dts = dts; if (dts == -1) { stream->dts = GST_CLOCK_TIME_NONE; return; @@ -1250,6 +1281,131 @@ gst_ts_demux_record_dts (GstTSDemux * demux, TSDemuxStream * stream, } } +/* This is called when we haven't got a valid initial PTS/DTS on all streams */ +static gboolean +check_pending_buffers (GstTSDemux * demux, TSDemuxStream * stream) +{ + gboolean have_observation = FALSE; + /* The biggest offset */ + guint64 offset = 0; + GList *tmp; + + /* 1. Go over all streams */ + for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) { + TSDemuxStream *tmpstream = (TSDemuxStream *) tmp->data; + /* 1.1 check if at least one stream got a valid DTS */ + if ((tmpstream->raw_dts != -1 && tmpstream->dts != GST_CLOCK_TIME_NONE) || + (tmpstream->raw_pts != -1 && tmpstream->pts != GST_CLOCK_TIME_NONE)) { + have_observation = TRUE; + break; + } + } + + /* 2. If we don't have a valid value yet, break out */ + if (have_observation == FALSE) + return FALSE; + + /* 3. Go over all streams that have current/pending data */ + for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) { + TSDemuxStream *tmpstream = (TSDemuxStream *) tmp->data; + PendingBuffer *pend; + guint64 firstval, lastval, ts; + + /* 3.1 Calculate the offset between current DTS and first DTS */ + if (tmpstream->pending == NULL || tmpstream->state == PENDING_PACKET_EMPTY) + continue; + /* If we don't have any pending data, the offset is 0 for this stream */ + if (tmpstream->pending == NULL) + break; + if (tmpstream->raw_dts != -1) + lastval = tmpstream->raw_dts; + else if (tmpstream->raw_pts != -1) + lastval = tmpstream->raw_pts; + else { + GST_WARNING ("Don't have a last DTS/PTS to use for offset recalculation"); + continue; + } + pend = tmpstream->pending->data; + if (pend->dts != -1) + firstval = pend->dts; + else if (pend->pts != -1) + firstval = pend->pts; + else { + GST_WARNING + ("Don't have a first DTS/PTS to use for offset recalculation"); + continue; + } + /* 3.2 Add to the offset the report TS for the current DTS */ + ts = mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux), + MPEGTIME_TO_GSTTIME (lastval), demux->program->pcr_pid); + if (ts == GST_CLOCK_TIME_NONE) { + GST_WARNING ("THIS SHOULD NOT HAPPEN !"); + continue; + } + ts += MPEGTIME_TO_GSTTIME (lastval - firstval); + /* 3.3 If that offset is bigger than the current offset, store it */ + if (ts > offset) + offset = ts; + } + + GST_DEBUG ("New initial pcr_offset %" GST_TIME_FORMAT, + GST_TIME_ARGS (offset)); + + /* 4. Set the offset on the packetizer */ + mpegts_packetizer_set_current_pcr_offset (MPEG_TS_BASE_PACKETIZER (demux), + offset, demux->program->pcr_pid); + + /* 4. Go over all streams */ + for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) { + TSDemuxStream *stream = (TSDemuxStream *) tmp->data; + + stream->pending_ts = FALSE; + /* 4.1 Set pending_ts for FALSE */ + + /* 4.2 Recalculate PTS/DTS (in running time) for pending data */ + if (stream->pending) { + GList *tmp2; + for (tmp2 = stream->pending; tmp2; tmp2 = tmp2->next) { + PendingBuffer *pend = (PendingBuffer *) tmp2->data; + if (pend->pts != -1) + GST_BUFFER_PTS (pend->buffer) = + mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux), + MPEGTIME_TO_GSTTIME (pend->pts), demux->program->pcr_pid); + if (pend->dts != -1) + GST_BUFFER_DTS (pend->buffer) = + mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux), + MPEGTIME_TO_GSTTIME (pend->dts), demux->program->pcr_pid); + /* 4.2.2 Set first_dts to TS of lowest DTS (for segment) */ + if (stream->first_dts == GST_CLOCK_TIME_NONE) { + if (GST_BUFFER_DTS (pend->buffer) != GST_CLOCK_TIME_NONE) + stream->first_dts = GST_BUFFER_DTS (pend->buffer); + else if (GST_BUFFER_PTS (pend->buffer) != GST_CLOCK_TIME_NONE) + stream->first_dts = GST_BUFFER_PTS (pend->buffer); + } + } + } + /* Recalculate PTS/DTS (in running time) for current data */ + if (stream->state != PENDING_PACKET_EMPTY) { + if (stream->raw_dts != -1) { + stream->dts = + mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux), + MPEGTIME_TO_GSTTIME (stream->raw_dts), demux->program->pcr_pid); + if (stream->first_dts == GST_CLOCK_TIME_NONE) + stream->first_dts = stream->dts; + } + if (stream->raw_pts != -1) { + stream->pts = + mpegts_packetizer_pts_to_ts (MPEG_TS_BASE_PACKETIZER (demux), + MPEGTIME_TO_GSTTIME (stream->raw_pts), demux->program->pcr_pid); + if (stream->first_dts == GST_CLOCK_TIME_NONE) + stream->first_dts = stream->pts; + } + } + } + + return TRUE; +} + static void gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream, guint8 * data, guint32 length, guint64 bufferoffset) @@ -1270,6 +1426,17 @@ gst_ts_demux_parse_pes_header (GstTSDemux * demux, TSDemuxStream * stream, gst_ts_demux_record_dts (demux, stream, header.DTS, bufferoffset); gst_ts_demux_record_pts (demux, stream, header.PTS, bufferoffset); + if (G_UNLIKELY (stream->pending_ts && + (stream->pts != GST_CLOCK_TIME_NONE + || stream->dts != GST_CLOCK_TIME_NONE))) { + GST_DEBUG ("Got pts/dts update, rechecking all streams"); + check_pending_buffers (demux, stream); + } else if (stream->first_dts == GST_CLOCK_TIME_NONE) { + if (GST_CLOCK_TIME_IS_VALID (stream->dts)) + stream->first_dts = stream->dts; + else if (GST_CLOCK_TIME_IS_VALID (stream->pts)) + stream->first_dts = stream->pts; + } GST_DEBUG_OBJECT (demux, "stream PTS %" GST_TIME_FORMAT " DTS %" GST_TIME_FORMAT, @@ -1413,13 +1580,10 @@ calculate_and_push_newsegment (GstTSDemux * demux, TSDemuxStream * stream) for (tmp = demux->program->stream_list; tmp; tmp = tmp->next) { TSDemuxStream *pstream = (TSDemuxStream *) tmp->data; - if (GST_CLOCK_TIME_IS_VALID (pstream->pts)) { - if (!GST_CLOCK_TIME_IS_VALID (lowest_pts) || pstream->pts < lowest_pts) - lowest_pts = pstream->pts; - } - if (GST_CLOCK_TIME_IS_VALID (pstream->dts)) { - if (!GST_CLOCK_TIME_IS_VALID (lowest_pts) || pstream->dts < lowest_pts) - lowest_pts = pstream->dts; + if (GST_CLOCK_TIME_IS_VALID (pstream->first_dts)) { + if (!GST_CLOCK_TIME_IS_VALID (lowest_pts) + || pstream->first_dts < lowest_pts) + lowest_pts = pstream->first_dts; } } if (GST_CLOCK_TIME_IS_VALID (lowest_pts)) @@ -1515,24 +1679,48 @@ gst_ts_demux_push_pending_data (GstTSDemux * demux, TSDemuxStream * stream) goto beach; } - if (G_UNLIKELY (!stream->active)) - activate_pad_for_stream (demux, stream); - - if (G_UNLIKELY (stream->pad == NULL)) { + if (G_UNLIKELY (demux->program == NULL)) { + GST_LOG_OBJECT (demux, "No program"); g_free (stream->data); goto beach; } - if (G_UNLIKELY (demux->program == NULL)) { - GST_LOG_OBJECT (demux, "No program"); - g_free (stream->data); + buffer = gst_buffer_new_wrapped (stream->data, stream->current_size); + + if (G_UNLIKELY (stream->pending_ts && !check_pending_buffers (demux, stream))) { + PendingBuffer *pend; + pend = g_slice_new0 (PendingBuffer); + pend->buffer = buffer; + pend->pts = stream->raw_pts; + pend->dts = stream->raw_dts; + stream->pending = g_list_append (stream->pending, pend); + GST_DEBUG ("Not enough information to push buffers yet, storing buffer"); goto beach; } + if (G_UNLIKELY (!stream->active)) + activate_pad_for_stream (demux, stream); + if (G_UNLIKELY (stream->need_newsegment)) calculate_and_push_newsegment (demux, stream); - buffer = gst_buffer_new_wrapped (stream->data, stream->current_size); + /* FIXME : Push pending buffers if any */ + if (G_UNLIKELY (stream->pending)) { + GList *tmp; + for (tmp = stream->pending; tmp; tmp = tmp->next) { + PendingBuffer *pend = (PendingBuffer *) tmp->data; + + GST_DEBUG_OBJECT (stream->pad, + "Pushing pending buffer PTS:%" GST_TIME_FORMAT " DTS:%" + GST_TIME_FORMAT, GST_TIME_ARGS (GST_BUFFER_PTS (pend->buffer)), + GST_TIME_ARGS (GST_BUFFER_DTS (pend->buffer))); + + res = gst_pad_push (stream->pad, pend->buffer); + g_slice_free (PendingBuffer, pend); + } + g_list_free (stream->pending); + stream->pending = NULL; + } GST_DEBUG_OBJECT (stream->pad, "stream->pts %" GST_TIME_FORMAT, GST_TIME_ARGS (stream->pts)); |