diff options
Diffstat (limited to 'gst/rtpmux')
-rw-r--r-- | gst/rtpmux/gstrtpdtmfmux.c | 47 | ||||
-rw-r--r-- | gst/rtpmux/gstrtpmux.c | 158 | ||||
-rw-r--r-- | gst/rtpmux/gstrtpmux.h | 30 |
3 files changed, 156 insertions, 79 deletions
diff --git a/gst/rtpmux/gstrtpdtmfmux.c b/gst/rtpmux/gstrtpdtmfmux.c index f50487447..9fd77c3e0 100644 --- a/gst/rtpmux/gstrtpdtmfmux.c +++ b/gst/rtpmux/gstrtpdtmfmux.c @@ -66,7 +66,8 @@ static GstPad *gst_rtp_dtmf_mux_request_new_pad (GstElement * element, static GstStateChangeReturn gst_rtp_dtmf_mux_change_state (GstElement * element, GstStateChange transition); -static GstFlowReturn gst_rtp_dtmf_mux_chain (GstPad * pad, GstBuffer * buffer); +static gboolean gst_rtp_dtmf_mux_accept_buffer_locked (GstRTPMux * rtp_mux, + GstRTPMuxPadPrivate * padpriv, GstBuffer * buffer); GST_BOILERPLATE (GstRTPDTMFMux, gst_rtp_dtmf_mux, GstRTPMux, GST_TYPE_RTP_MUX); @@ -104,24 +105,19 @@ gst_rtp_dtmf_mux_class_init (GstRTPDTMFMuxClass * klass) GST_DEBUG_FUNCPTR (gst_rtp_dtmf_mux_request_new_pad); gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_dtmf_mux_change_state); - gstrtpmux_class->chain_func = GST_DEBUG_FUNCPTR (gst_rtp_dtmf_mux_chain); + gstrtpmux_class->accept_buffer_locked = gst_rtp_dtmf_mux_accept_buffer_locked; } -static GstFlowReturn -gst_rtp_dtmf_mux_chain (GstPad * pad, GstBuffer * buffer) +static gboolean +gst_rtp_dtmf_mux_accept_buffer_locked (GstRTPMux * rtp_mux, + GstRTPMuxPadPrivate * padpriv, GstBuffer * buffer) { - GstRTPDTMFMux *mux; - GstFlowReturn ret = GST_FLOW_ERROR; - GstRTPMuxPadPrivate *padpriv = NULL; + GstRTPDTMFMux *mux = GST_RTP_DTMF_MUX (rtp_mux); GstClockTime running_ts; - mux = GST_RTP_DTMF_MUX (gst_pad_get_parent (pad)); - running_ts = GST_BUFFER_TIMESTAMP (buffer); - GST_OBJECT_LOCK (mux); if (GST_CLOCK_TIME_IS_VALID (running_ts)) { - padpriv = gst_pad_get_element_private (pad); if (padpriv && padpriv->segment.format == GST_FORMAT_TIME) running_ts = gst_segment_to_running_time (&padpriv->segment, GST_FORMAT_TIME, GST_BUFFER_TIMESTAMP (buffer)); @@ -134,12 +130,12 @@ gst_rtp_dtmf_mux_chain (GstPad * pad, GstBuffer * buffer) mux->last_priority_end); else mux->last_priority_end = running_ts + GST_BUFFER_DURATION (buffer); - GST_LOG_OBJECT (mux, "Got buffer %p on priority pad %s," + GST_LOG_OBJECT (mux, "Got buffer %p on priority pad, " " blocking regular pads until %" GST_TIME_FORMAT, buffer, - GST_PAD_NAME (pad), GST_TIME_ARGS (mux->last_priority_end)); + GST_TIME_ARGS (mux->last_priority_end)); } else { - GST_WARNING_OBJECT (mux, "Buffer %p on pad %s has an invalid duration," - " not blocking other pad", buffer, GST_PAD_NAME (pad)); + GST_WARNING_OBJECT (mux, "Buffer %p has an invalid duration," + " not blocking other pad", buffer); } } else { if (GST_CLOCK_TIME_IS_VALID (mux->last_priority_end) && @@ -147,30 +143,15 @@ gst_rtp_dtmf_mux_chain (GstPad * pad, GstBuffer * buffer) GST_LOG_OBJECT (mux, "Dropping buffer %p because running time" " %" GST_TIME_FORMAT " < %" GST_TIME_FORMAT, buffer, GST_TIME_ARGS (running_ts), GST_TIME_ARGS (mux->last_priority_end)); - goto drop_buffer; + return FALSE; } } } else { - GST_LOG_OBJECT (pad, "Buffer %p has an invalid timestamp," + GST_LOG_OBJECT (mux, "Buffer %p has an invalid timestamp," " letting through", buffer); } - GST_OBJECT_UNLOCK (mux); - - if (parent_class->chain_func) - ret = parent_class->chain_func (pad, buffer); - else - gst_buffer_unref (buffer); - -out: - - gst_object_unref (mux); - return ret; -drop_buffer: - gst_buffer_unref (buffer); - ret = GST_FLOW_OK; - GST_OBJECT_UNLOCK (mux); - goto out; + return TRUE; } diff --git a/gst/rtpmux/gstrtpmux.c b/gst/rtpmux/gstrtpmux.c index 27da522df..babf36d5b 100644 --- a/gst/rtpmux/gstrtpmux.c +++ b/gst/rtpmux/gstrtpmux.c @@ -72,6 +72,8 @@ static GstPad *gst_rtp_mux_request_new_pad (GstElement * element, GstPadTemplate * templ, const gchar * name); static void gst_rtp_mux_release_pad (GstElement * element, GstPad * pad); static GstFlowReturn gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer); +static GstFlowReturn gst_rtp_mux_chain_list (GstPad * pad, + GstBufferList * bufferlist); static gboolean gst_rtp_mux_setcaps (GstPad * pad, GstCaps * caps); static GstCaps *gst_rtp_mux_getcaps (GstPad * pad); static gboolean gst_rtp_mux_sink_event (GstPad * pad, GstEvent * event); @@ -137,8 +139,6 @@ gst_rtp_mux_class_init (GstRTPMuxClass * klass) GST_DEBUG_FUNCPTR (gst_rtp_mux_request_new_pad); gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_mux_release_pad); gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_mux_change_state); - - klass->chain_func = gst_rtp_mux_chain; } static void @@ -227,8 +227,9 @@ gst_rtp_mux_setup_sinkpad (GstRTPMux * rtp_mux, GstPad * sinkpad) /* setup some pad functions */ gst_pad_set_setcaps_function (sinkpad, gst_rtp_mux_setcaps); gst_pad_set_getcaps_function (sinkpad, gst_rtp_mux_getcaps); - if (klass->chain_func) - gst_pad_set_chain_function (sinkpad, klass->chain_func); + gst_pad_set_chain_function (sinkpad, GST_DEBUG_FUNCPTR (gst_rtp_mux_chain)); + gst_pad_set_chain_list_function (sinkpad, + GST_DEBUG_FUNCPTR (gst_rtp_mux_chain_list)); gst_pad_set_event_function (sinkpad, GST_DEBUG_FUNCPTR (gst_rtp_mux_sink_event)); @@ -287,19 +288,14 @@ gst_rtp_mux_release_pad (GstElement * element, GstPad * pad) /* Put our own clock-base on the buffer */ static void -gst_rtp_mux_readjust_rtp_timestamp (GstRTPMux * rtp_mux, GstPad * pad, - GstBuffer * buffer) +gst_rtp_mux_readjust_rtp_timestamp_locked (GstRTPMux * rtp_mux, + GstRTPMuxPadPrivate * padpriv, GstBuffer * buffer) { guint32 ts; guint32 sink_ts_base = 0; - GstRTPMuxPadPrivate *padpriv; - - GST_OBJECT_LOCK (rtp_mux); - padpriv = gst_pad_get_element_private (pad); if (padpriv && padpriv->have_clock_base) sink_ts_base = padpriv->clock_base; - GST_OBJECT_UNLOCK (rtp_mux); ts = gst_rtp_buffer_get_timestamp (buffer) - sink_ts_base + rtp_mux->ts_base; GST_LOG_OBJECT (rtp_mux, "Re-adjusting RTP ts %u to %u", @@ -307,38 +303,82 @@ gst_rtp_mux_readjust_rtp_timestamp (GstRTPMux * rtp_mux, GstPad * pad, gst_rtp_buffer_set_timestamp (buffer, ts); } +static gboolean +process_buffer_locked (GstRTPMux * rtp_mux, GstRTPMuxPadPrivate * padpriv, + GstBuffer * buffer) +{ + GstRTPMuxClass *klass = GST_RTP_MUX_GET_CLASS (rtp_mux); + + if (klass->accept_buffer_locked) + if (!klass->accept_buffer_locked (rtp_mux, padpriv, buffer)) + return FALSE; + + rtp_mux->seqnum++; + gst_rtp_buffer_set_seq (buffer, rtp_mux->seqnum); + + gst_rtp_buffer_set_ssrc (buffer, rtp_mux->current_ssrc); + gst_rtp_mux_readjust_rtp_timestamp_locked (rtp_mux, padpriv, buffer); + GST_LOG_OBJECT (rtp_mux, "Pushing packet size %d, seq=%d, ts=%u", + GST_BUFFER_SIZE (buffer), rtp_mux->seqnum, + gst_rtp_buffer_get_timestamp (buffer)); + + if (padpriv) { + gst_buffer_set_caps (buffer, padpriv->out_caps); + if (padpriv->segment.format == GST_FORMAT_TIME) + GST_BUFFER_TIMESTAMP (buffer) = + gst_segment_to_running_time (&padpriv->segment, GST_FORMAT_TIME, + GST_BUFFER_TIMESTAMP (buffer)); + } + + return TRUE; +} + static GstFlowReturn -gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer) +gst_rtp_mux_chain_list (GstPad * pad, GstBufferList * bufferlist) { GstRTPMux *rtp_mux; GstFlowReturn ret; + GstBufferListIterator *it; GstRTPMuxPadPrivate *padpriv; GstEvent *newseg_event = NULL; + gboolean drop = TRUE; rtp_mux = GST_RTP_MUX (gst_pad_get_parent (pad)); - if (!gst_rtp_buffer_validate (buffer)) { - gst_buffer_unref (buffer); + if (!gst_rtp_buffer_list_validate (bufferlist)) { GST_ERROR_OBJECT (rtp_mux, "Invalid RTP buffer"); gst_object_unref (rtp_mux); return GST_FLOW_ERROR; } - buffer = gst_buffer_make_writable (buffer); - GST_OBJECT_LOCK (rtp_mux); - rtp_mux->seqnum++; - gst_rtp_buffer_set_seq (buffer, rtp_mux->seqnum); + padpriv = gst_pad_get_element_private (pad); - if (padpriv) { - gst_buffer_set_caps (buffer, padpriv->out_caps); - if (padpriv->segment.format == GST_FORMAT_TIME) - GST_BUFFER_TIMESTAMP (buffer) = - gst_segment_to_running_time (&padpriv->segment, GST_FORMAT_TIME, - GST_BUFFER_TIMESTAMP (buffer)); + if (!padpriv) { + GST_OBJECT_UNLOCK (rtp_mux); + ret = GST_FLOW_NOT_LINKED; + gst_buffer_list_unref (bufferlist); + goto out; + } + + bufferlist = gst_buffer_list_make_writable (bufferlist); + it = gst_buffer_list_iterate (bufferlist); + while (gst_buffer_list_iterator_next_group (it)) { + GstBuffer *rtpbuf; + + rtpbuf = gst_buffer_list_iterator_next (it); + rtpbuf = gst_buffer_make_writable (rtpbuf); + + drop = !process_buffer_locked (rtp_mux, padpriv, rtpbuf); + + if (drop) + break; + + gst_buffer_list_iterator_take (it, rtpbuf); } + gst_buffer_list_iterator_free (it); - if (rtp_mux->segment_pending) { + if (!drop && rtp_mux->segment_pending) { /* * We set the start at 0, because we re-timestamps to the running time */ @@ -347,24 +387,78 @@ gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer) rtp_mux->segment_pending = FALSE; } - GST_OBJECT_UNLOCK (rtp_mux); - gst_rtp_buffer_set_ssrc (buffer, rtp_mux->current_ssrc); - gst_rtp_mux_readjust_rtp_timestamp (rtp_mux, pad, buffer); - GST_LOG_OBJECT (rtp_mux, "Pushing packet size %d, seq=%d, ts=%u", - GST_BUFFER_SIZE (buffer), rtp_mux->seqnum, - gst_rtp_buffer_get_timestamp (buffer)); + GST_OBJECT_UNLOCK (rtp_mux); if (newseg_event) gst_pad_push_event (rtp_mux->srcpad, newseg_event); + if (drop) { + gst_buffer_list_unref (bufferlist); + ret = GST_FLOW_OK; + } else { + ret = gst_pad_push_list (rtp_mux->srcpad, bufferlist); + } + +out: + + gst_object_unref (rtp_mux); + + return ret; +} + +static GstFlowReturn +gst_rtp_mux_chain (GstPad * pad, GstBuffer * buffer) +{ + GstRTPMux *rtp_mux; + GstFlowReturn ret; + GstRTPMuxPadPrivate *padpriv; + GstEvent *newseg_event = NULL; + gboolean drop; + + rtp_mux = GST_RTP_MUX (gst_pad_get_parent (pad)); + + if (!gst_rtp_buffer_validate (buffer)) { + gst_buffer_unref (buffer); + GST_ERROR_OBJECT (rtp_mux, "Invalid RTP buffer"); + gst_object_unref (rtp_mux); + return GST_FLOW_ERROR; + } + + GST_OBJECT_LOCK (rtp_mux); + padpriv = gst_pad_get_element_private (pad); + if (!padpriv) { + GST_OBJECT_UNLOCK (rtp_mux); ret = GST_FLOW_NOT_LINKED; gst_buffer_unref (buffer); goto out; } - ret = gst_pad_push (rtp_mux->srcpad, buffer); + buffer = gst_buffer_make_writable (buffer); + + drop = !process_buffer_locked (rtp_mux, padpriv, buffer); + + if (!drop && rtp_mux->segment_pending) { + /* + * We set the start at 0, because we re-timestamps to the running time + */ + newseg_event = gst_event_new_new_segment_full (FALSE, 1.0, 1.0, + GST_FORMAT_TIME, 0, -1, 0); + + rtp_mux->segment_pending = FALSE; + } + GST_OBJECT_UNLOCK (rtp_mux); + + if (newseg_event) + gst_pad_push_event (rtp_mux->srcpad, newseg_event); + + if (drop) { + gst_buffer_unref (buffer); + ret = GST_FLOW_OK; + } else { + ret = gst_pad_push (rtp_mux->srcpad, buffer); + } out: diff --git a/gst/rtpmux/gstrtpmux.h b/gst/rtpmux/gstrtpmux.h index 8a6a5dd25..96513836d 100644 --- a/gst/rtpmux/gstrtpmux.h +++ b/gst/rtpmux/gstrtpmux.h @@ -38,6 +38,20 @@ G_BEGIN_DECLS typedef struct _GstRTPMux GstRTPMux; typedef struct _GstRTPMuxClass GstRTPMuxClass; + +typedef struct +{ + gboolean have_clock_base; + guint clock_base; + + GstCaps *out_caps; + + GstSegment segment; + + gboolean priority; +} GstRTPMuxPadPrivate; + + /** * GstRTPMux: * @@ -66,23 +80,11 @@ struct _GstRTPMuxClass { GstElementClass parent_class; - GstFlowReturn (*chain_func) (GstPad * pad, GstBuffer * buffer); + gboolean (*accept_buffer_locked) (GstRTPMux *rtp_mux, + GstRTPMuxPadPrivate * padpriv, GstBuffer * buffer); }; -typedef struct -{ - gboolean have_clock_base; - guint clock_base; - - GstCaps *out_caps; - - GstSegment segment; - - gboolean priority; -} GstRTPMuxPadPrivate; - - GType gst_rtp_mux_get_type (void); gboolean gst_rtp_mux_plugin_init (GstPlugin * plugin); |