From acb81c58c547b0e6155fd61b7914f39deea428ad Mon Sep 17 00:00:00 2001 From: Mark Nauwelaerts Date: Wed, 5 Mar 2008 06:03:03 +0000 Subject: Port mplex element to 0.10. Fixes bug #520329. Original commit message from CVS: Patch by: Mark Nauwelaerts * configure.ac: * ext/Makefile.am: * ext/mplex/Makefile.am: * ext/mplex/gstmplex.cc: * ext/mplex/gstmplex.hh: * ext/mplex/gstmplexibitstream.cc: * ext/mplex/gstmplexibitstream.hh: * ext/mplex/gstmplexjob.cc: * ext/mplex/gstmplexjob.hh: * ext/mplex/gstmplexoutputstream.cc: * ext/mplex/gstmplexoutputstream.hh: Port mplex element to 0.10. Fixes bug #520329. * tests/check/Makefile.am: * tests/check/elements/mplex.c: (test_sink_event), (setup_src_pad), (teardown_src_pad), (setup_mplex), (cleanup_mplex), (GST_START_TEST), (mplex_suite), (main): Add unit test for the mplex element. --- ext/mplex/Makefile.am | 6 +- ext/mplex/gstmplex.cc | 781 +++++++++++++++++++++++++++++--------- ext/mplex/gstmplex.hh | 64 +++- ext/mplex/gstmplexibitstream.cc | 77 ++-- ext/mplex/gstmplexibitstream.hh | 20 +- ext/mplex/gstmplexjob.cc | 53 ++- ext/mplex/gstmplexjob.hh | 2 + ext/mplex/gstmplexoutputstream.cc | 35 +- ext/mplex/gstmplexoutputstream.hh | 15 +- 9 files changed, 784 insertions(+), 269 deletions(-) (limited to 'ext/mplex') diff --git a/ext/mplex/Makefile.am b/ext/mplex/Makefile.am index d99b5b7c2..aa51a9950 100644 --- a/ext/mplex/Makefile.am +++ b/ext/mplex/Makefile.am @@ -6,8 +6,10 @@ libgstmplex_la_SOURCES = \ gstmplexjob.cc \ gstmplexoutputstream.cc -libgstmplex_la_CXXFLAGS = $(GST_CFLAGS) $(MPLEX_CFLAGS) -libgstmplex_la_LIBADD = $(MPLEX_LIBS) +libgstmplex_la_CXXFLAGS = \ + $(GST_PLUGINS_BASE_CFLAGS) $(GST_CXXFLAGS) $(MPLEX_CFLAGS) +libgstmplex_la_LIBADD = \ + $(GST_PLUGINS_BASE_LIBS) $(GST_BASE_LIBS) $(GST_LIBS) $(MPLEX_LIBS) libgstmplex_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) noinst_HEADERS = \ diff --git a/ext/mplex/gstmplex.cc b/ext/mplex/gstmplex.cc index a4c92f9c3..536882352 100644 --- a/ext/mplex/gstmplex.cc +++ b/ext/mplex/gstmplex.cc @@ -1,5 +1,6 @@ /* GStreamer mplex (mjpegtools) wrapper * (c) 2003 Ronald Bultje + * (c) 2008 Mark Nauwelaerts * * gstmplex.cc: gstreamer mplex wrapper * @@ -19,6 +20,37 @@ * Boston, MA 02111-1307, USA. */ +/** + * SECTION:element-mplex + * @see_also: mpeg2enc + * + * + * + * This element is an audio/video multiplexer for MPEG-1/2 video streams + * and (un)compressed audio streams such as AC3, MPEG layer I/II/III. + * It is based on the mjpegtools library. + * Documentation on creating MPEG videos in general can be found in the + * MJPEG Howto + * and the man-page of the mplex tool documents the properties of this element, + * which are shared with the mplex tool. + * + * Example pipeline + * + * + * gst-launch -v videotestsrc num-buffers=1000 ! mpeg2enc ! mplex ! filesink location=videotestsrc.mpg + * + * This example pipeline will encode a test video source to a an + * MPEG1 elementary stream and multiplexes this to an MPEG system stream. + * + * + * If several streams are being multiplexed, there should (as usual) be + * a queue in each stream, and due to mplex' buffering the capacities of these + * may have to be set to a few times the default settings to prevent the + * pipeline stalling. + * + * + */ + #ifdef HAVE_CONFIG_H #include "config.h" #endif @@ -28,10 +60,12 @@ #include "gstmplexibitstream.hh" #include "gstmplexjob.hh" +GST_DEBUG_CATEGORY (mplex_debug); + static GstStaticPadTemplate src_templ = GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC, GST_PAD_ALWAYS, - GST_STATIC_CAPS ("video/mpeg, " "systemstream = (boolean) true") + GST_STATIC_CAPS ("video/mpeg, systemstream = (boolean) true ") ); static GstStaticPadTemplate video_sink_templ = @@ -39,20 +73,29 @@ GST_STATIC_PAD_TEMPLATE ("video_%d", GST_PAD_SINK, GST_PAD_REQUEST, GST_STATIC_CAPS ("video/mpeg, " - "mpegversion = (int) [ 1, 2 ], " "systemstream = (boolean) false") + "mpegversion = (int) { 1, 2 }, " + "systemstream = (boolean) false, " + "width = (int) [ 16, 4096 ], " + "height = (int) [ 16, 4096 ], framerate = (fraction) [ 0, MAX ]") ); +#define COMMON_AUDIO_CAPS \ + "channels = (int) [ 1, 8 ], " \ + "rate = (int) [ 8000, 96000 ]" + static GstStaticPadTemplate audio_sink_templ = GST_STATIC_PAD_TEMPLATE ("audio_%d", GST_PAD_SINK, GST_PAD_REQUEST, GST_STATIC_CAPS ("audio/mpeg, " "mpegversion = (int) 1, " - "layer = (int) [ 1, 2 ]; " - "audio/x-ac3; " + "layer = (int) [ 1, 3 ], " + COMMON_AUDIO_CAPS "; " + "audio/x-ac3, " + COMMON_AUDIO_CAPS "; " "audio/x-dts; " "audio/x-raw-int, " - "endianness = (int) BYTE_ORDER, " + "endianness = (int) BIG_ENDIAN, " "signed = (boolean) TRUE, " "width = (int) { 16, 20, 24 }, " "depth = (int) { 16, 20, 24 }, " @@ -61,16 +104,13 @@ static GstStaticPadTemplate audio_sink_templ = /* FIXME: subtitles */ -static void gst_mplex_base_init (GstMplexClass * klass); -static void gst_mplex_class_init (GstMplexClass * klass); -static void gst_mplex_init (GstMplex * enc); -static void gst_mplex_dispose (GObject * object); - -static void gst_mplex_loop (GstElement * element); - +static void gst_mplex_finalize (GObject * object); +static void gst_mplex_reset (GstMplex * mplex); +static void gst_mplex_loop (GstMplex * mplex); static GstPad *gst_mplex_request_new_pad (GstElement * element, GstPadTemplate * templ, const gchar * name); - +static void gst_mplex_release_pad (GstElement * element, GstPad * pad); +static gboolean gst_mplex_src_activate_push (GstPad * pad, gboolean active); static GstStateChangeReturn gst_mplex_change_state (GstElement * element, GstStateChange transition); @@ -79,53 +119,26 @@ static void gst_mplex_get_property (GObject * object, static void gst_mplex_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); -static GstElementClass *parent_class = NULL; - -GType -gst_mplex_get_type (void) -{ - static GType gst_mplex_type = 0; - - if (!gst_mplex_type) { - static const GTypeInfo gst_mplex_info = { - sizeof (GstMplexClass), - (GBaseInitFunc) gst_mplex_base_init, - NULL, - (GClassInitFunc) gst_mplex_class_init, - NULL, - NULL, - sizeof (GstMplex), - 0, - (GInstanceInitFunc) gst_mplex_init, - }; - - gst_mplex_type = - g_type_register_static (GST_TYPE_ELEMENT, - "GstMplex", &gst_mplex_info, (GTypeFlags) 0); - } - - return gst_mplex_type; -} +GST_BOILERPLATE (GstMplex, gst_mplex, GstElement, GST_TYPE_ELEMENT); static void -gst_mplex_base_init (GstMplexClass * klass) +gst_mplex_base_init (gpointer klass) { - static GstElementDetails gst_mplex_details = { - "mplex video multiplexer", - "Codec/Muxer", - "High-quality MPEG/DVD/SVCD/VCD video/audio multiplexer", - "Andrew Stevens \n" - "Ronald Bultje " - }; GstElementClass *element_class = GST_ELEMENT_CLASS (klass); + gst_element_class_set_details_simple (element_class, + "mplex video multiplexer", "Codec/Muxer", + "High-quality MPEG/DVD/SVCD/VCD video/audio multiplexer", + "Andrew Stevens \n" + "Ronald Bultje \n" + "Mark Nauwelaerts set_property = gst_mplex_set_property; object_class->get_property = gst_mplex_get_property; - object_class->dispose = gst_mplex_dispose; + /* register properties */ + GstMplexJob::initProperties (object_class); + + object_class->finalize = GST_DEBUG_FUNCPTR (gst_mplex_finalize); - element_class->change_state = gst_mplex_change_state; - element_class->request_new_pad = gst_mplex_request_new_pad; + element_class->change_state = GST_DEBUG_FUNCPTR (gst_mplex_change_state); + element_class->request_new_pad = + GST_DEBUG_FUNCPTR (gst_mplex_request_new_pad); + element_class->release_pad = GST_DEBUG_FUNCPTR (gst_mplex_release_pad); } static void -gst_mplex_dispose (GObject * object) +gst_mplex_finalize (GObject * object) { GstMplex *mplex = GST_MPLEX (object); + GSList *walk; - if (mplex->mux) { - delete mplex->mux; + /* release all pads */ + walk = mplex->pads; + while (walk) { + GstMplexPad *mpad = (GstMplexPad *) walk->data; - mplex->mux = NULL; + gst_object_unref (mpad->pad); + mpad->pad = NULL; + walk = walk->next; } + + /* clean up what's left of them */ + gst_mplex_reset (mplex); + + /* ... and of the rest */ delete mplex->job; + + g_mutex_free (mplex->tlock); + + G_OBJECT_CLASS (parent_class)->finalize (object); } static void -gst_mplex_init (GstMplex * mplex) +gst_mplex_init (GstMplex * mplex, GstMplexClass * g_class) { GstElement *element = GST_ELEMENT (mplex); - - GST_FLAG_SET (element, GST_ELEMENT_EVENT_AWARE); + GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); mplex->srcpad = - gst_pad_new_from_template (gst_element_get_pad_template (element, "src"), - "src"); + gst_pad_new_from_template (gst_element_class_get_pad_template + (element_class, "src"), "src"); gst_element_add_pad (element, mplex->srcpad); + gst_pad_use_fixed_caps (mplex->srcpad); + gst_pad_set_activatepush_function (mplex->srcpad, + GST_DEBUG_FUNCPTR (gst_mplex_src_activate_push)); mplex->job = new GstMplexJob (); - mplex->mux = NULL; mplex->num_apads = 0; mplex->num_vpads = 0; - gst_element_set_loop_function (element, gst_mplex_loop); + mplex->tlock = g_mutex_new (); + + gst_mplex_reset (mplex); } static void -gst_mplex_loop (GstElement * element) +gst_mplex_reset (GstMplex * mplex) { - GstMplex *mplex = GST_MPLEX (element); + GSList *walk; + GSList *nlist = NULL; - if (!mplex->mux) { - GstMplexOutputStream *out; - const GList *item; + mplex->eos = FALSE; + mplex->srcresult = GST_FLOW_CUSTOM_SUCCESS; - for (item = gst_element_get_pad_list (element); - item != NULL; item = item->next) { - StreamKind type; - GstMplexIBitStream *inputstream; - JobStream *jobstream; - GstPad *pad = GST_PAD (item->data); - GstStructure *structure; - const GstCaps *caps; - const gchar *mime; + /* reset existing streams */ + walk = mplex->pads; + while (walk != NULL) { + GstMplexPad *mpad; - /* skip our source pad */ - if (GST_PAD_DIRECTION (pad) == GST_PAD_SRC) - continue; + mpad = (GstMplexPad *) walk->data; - /* create inputstream, assure we've got caps */ - inputstream = new GstMplexIBitStream (pad); + mpad->needed = 0; + mpad->eos = FALSE; + gst_adapter_clear (mpad->adapter); + if (mpad->bs) { + delete mpad->bs; - /* skip unnegotiated pads */ - if (!(caps = GST_PAD_CAPS (pad))) { - delete inputstream; + mpad->bs = NULL; + } - continue; - } + if (!mpad->pad) { + g_cond_free (mpad->cond); + gst_object_unref (mpad->adapter); + g_free (mpad); + } else + nlist = g_slist_append (nlist, mpad); - /* get format */ - structure = gst_caps_get_structure (caps, 0); - mime = gst_structure_get_name (structure); - - if (!strcmp (mime, "video/mpeg")) { - VideoParams *params; - - type = MPEG_VIDEO; - - params = VideoParams::Default (mplex->job->mux_format); - mplex->job->video_param.push_back (params); - mplex->job->video_tracks++; - } else if (!strcmp (mime, "audio/mpeg")) { - type = MPEG_AUDIO; - mplex->job->audio_tracks++; - } else if (!strcmp (mime, "audio/x-ac3")) { - type = AC3_AUDIO; - mplex->job->audio_tracks++; - } else if (!strcmp (mime, "audio/x-dts")) { - type = DTS_AUDIO; - mplex->job->audio_tracks++; - } else if (!strcmp (mime, "audio/x-raw-int")) { - LpcmParams *params; - gint bits, chans, rate; - - type = LPCM_AUDIO; - - /* set LPCM params */ - gst_structure_get_int (structure, "depth", &bits); - gst_structure_get_int (structure, "rate", &rate); - gst_structure_get_int (structure, "channels", &chans); - params = LpcmParams::Checked (rate, chans, bits); - - mplex->job->lpcm_param.push_back (params); - mplex->job->audio_tracks++; - mplex->job->lpcm_tracks++; - } else { - delete inputstream; - - continue; - } + walk = walk->next; + } - jobstream = new JobStream (inputstream, type); - mplex->job->streams.push_back (jobstream); - } + g_slist_free (mplex->pads); + mplex->pads = nlist; - if (!mplex->job->video_tracks && !mplex->job->audio_tracks) { - GST_ELEMENT_ERROR (element, CORE, NEGOTIATION, (NULL), - ("no input video or audio tracks set up before loop function")); - return; - } + /* clear mplex stuff */ + /* clean up stream settings */ + while (!mplex->job->streams.empty ()) { + delete mplex->job->streams.back (); + + mplex->job->streams.pop_back (); + } + while (!mplex->job->video_param.empty ()) { + delete mplex->job->video_param.back (); - /* create new encoder with inputs/output */ - out = new GstMplexOutputStream (element, mplex->srcpad); - mplex->mux = new Multiplexor (*mplex->job, *out); + mplex->job->video_param.pop_back (); } + while (!mplex->job->lpcm_param.empty ()) { + delete mplex->job->lpcm_param.back (); - mplex->mux->Multiplex (); + mplex->job->lpcm_param.pop_back (); + } + mplex->job->audio_tracks = 0; + mplex->job->video_tracks = 0; + mplex->job->lpcm_tracks = 0; } -static GstPadLinkReturn -gst_mplex_sink_link (GstPad * pad, const GstCaps * caps) +static gboolean +gst_mplex_setcaps (GstPad * pad, GstCaps * caps) { - GstStructure *structure = gst_caps_get_structure (caps, 0); - const gchar *mime = gst_structure_get_name (structure); + GstMplex *mplex; + const gchar *mime; + GstStructure *structure; + StreamKind type; + JobStream *jobstream; + GstMplexIBitStream *inputstream; + GstMplexPad *mpad; + GstCaps *othercaps; + gboolean ret = TRUE; + + mplex = GST_MPLEX (GST_PAD_PARENT (pad)); + + /* does not go well to negotiate when started */ + if (mplex->srcresult != GST_FLOW_CUSTOM_SUCCESS) + goto refuse_renegotiation; + + /* since muxer does not really check much ... */ + othercaps = gst_caps_intersect (caps, gst_pad_get_pad_template_caps (pad)); + if (othercaps) + gst_caps_unref (othercaps); + else + goto refuse_caps; + + /* set the fixed template caps on the srcpad, should accept without objection */ + othercaps = gst_caps_copy (gst_pad_get_pad_template_caps (mplex->srcpad)); + ret = gst_pad_set_caps (mplex->srcpad, othercaps); + gst_caps_unref (othercaps); + if (!ret) + goto refuse_caps; + + structure = gst_caps_get_structure (caps, 0); + mime = gst_structure_get_name (structure); + + if (!strcmp (mime, "video/mpeg")) { /* video */ + VideoParams *params; + + type = MPEG_VIDEO; + if (mplex->job->bufsize) + params = VideoParams::Checked (mplex->job->bufsize); + else + params = VideoParams::Default (mplex->job->mux_format); + /* set standard values if forced by the selected profile */ + if (params->Force (mplex->job->mux_format)) + GST_WARNING_OBJECT (mplex, + "overriding non-standard option due to selected profile"); + + mplex->job->video_param.push_back (params); + mplex->job->video_tracks++; + } else { /* audio */ + if (!strcmp (mime, "audio/mpeg")) { + type = MPEG_AUDIO; + } else if (!strcmp (mime, "audio/x-ac3")) { + type = AC3_AUDIO; + } else if (!strcmp (mime, "audio/x-dts")) { + type = DTS_AUDIO; + } else if (!strcmp (mime, "audio/x-raw-int")) { + LpcmParams *params; + gint bits, chans, rate; + gboolean result = TRUE; + + type = LPCM_AUDIO; + + /* set LPCM params */ + result &= gst_structure_get_int (structure, "depth", &bits); + result &= gst_structure_get_int (structure, "rate", &rate); + result &= gst_structure_get_int (structure, "channels", &chans); + if (!result) + goto refuse_caps; + + params = LpcmParams::Checked (rate, chans, bits); + + mplex->job->lpcm_param.push_back (params); + mplex->job->lpcm_tracks++; + } else + goto refuse_caps; + + mplex->job->audio_tracks++; + } - /* raw audio caps needs to be fixed */ - if (!strcmp (mime, "audio/x-raw-int")) { - gint width, depth; + mpad = (GstMplexPad *) gst_pad_get_element_private (pad); + g_return_val_if_fail (mpad, FALSE); + inputstream = new GstMplexIBitStream (mpad); + mpad->bs = inputstream; + jobstream = new JobStream (inputstream, type); + mplex->job->streams.push_back (jobstream); - if (!gst_caps_is_fixed (caps)) - return GST_PAD_LINK_DELAYED; + return TRUE; - gst_structure_get_int (structure, "width", &width); - gst_structure_get_int (structure, "depth", &depth); +refuse_caps: + { + GST_WARNING_OBJECT (mplex, "refused caps %" GST_PTR_FORMAT, caps); - if (depth != width) - return GST_PAD_LINK_REFUSED; + /* undo if we were a bit too fast/confident */ + if (GST_PAD_CAPS (mplex->srcpad)) + gst_pad_set_caps (mplex->srcpad, NULL); + + return FALSE; } +refuse_renegotiation: + { + GST_WARNING_OBJECT (mplex, "already started; " + "refused (re)negotiation (to %" GST_PTR_FORMAT ")", caps); - /* we do the actual inputstream setup in our first loopfunc cycle */ - return GST_PAD_LINK_OK; + return FALSE; + } +} + +static void +gst_mplex_loop (GstMplex * mplex) +{ + GstMplexOutputStream *out = NULL; + Multiplexor *mux = NULL; + GSList *walk; + + /* do not try to resume muxing after it finished + * this can be relevant mainly/only in case of forced state change */ + if (mplex->eos) + goto eos; + + /* inform downstream about what's coming */ + gst_pad_push_event (mplex->srcpad, gst_event_new_new_segment (FALSE, 1.0, + GST_FORMAT_BYTES, 0, -1, 0)); + + /* hm (!) each inputstream really needs an initial read + * so that all is internally in the proper state */ + walk = mplex->pads; + while (walk != NULL) { + GstMplexPad *mpad; + + mpad = (GstMplexPad *) walk->data; + mpad->bs->ReadBuffer (); + + walk = walk->next; + } + + /* create new multiplexer with inputs/output */ + out = new GstMplexOutputStream (mplex, mplex->srcpad); +#if GST_MJPEGTOOLS_API >= 10900 + mux = new Multiplexor (*mplex->job, *out, NULL); +#else + mux = new Multiplexor (*mplex->job, *out); +#endif + + if (mux) { + mux->Multiplex (); + delete mux; + delete out; + + /* if not well and truly eos, something strange happened */ + if (!mplex->eos) { + GST_ERROR_OBJECT (mplex, "muxing task ended without being eos"); + /* notify there is no point in collecting any more */ + GST_MPLEX_MUTEX_LOCK (mplex); + mplex->srcresult = GST_FLOW_ERROR; + GST_MPLEX_SIGNAL_ALL (mplex); + GST_MPLEX_MUTEX_UNLOCK (mplex); + } else + goto eos; + } else { + GST_WARNING_OBJECT (mplex, "failed to create Multiplexor"); + } + + /* fall-through */ +done: + { + /* no need to run wildly, stopped elsewhere, e.g. state change */ + GST_DEBUG_OBJECT (mplex, "pausing muxing task"); + gst_pad_pause_task (mplex->srcpad); + + return; + } +eos: + { + GST_DEBUG_OBJECT (mplex, "encoding task reached eos"); + goto done; + } +} + +static gboolean +gst_mplex_sink_event (GstPad * sinkpad, GstEvent * event) +{ + GstMplex *mplex; + GstMplexPad *mpad; + gboolean result = TRUE; + + mplex = (GstMplex *) (GST_PAD_PARENT (sinkpad)); + mpad = (GstMplexPad *) gst_pad_get_element_private (sinkpad); + g_return_val_if_fail (mpad, FALSE); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_FLUSH_START: + /* forward event */ + gst_pad_event_default (sinkpad, event); + + /* now unblock the chain function */ + GST_MPLEX_MUTEX_LOCK (mplex); + mplex->srcresult = GST_FLOW_WRONG_STATE; + GST_MPLEX_SIGNAL (mplex, mpad); + GST_MPLEX_MUTEX_UNLOCK (mplex); + /* no way to pause/restart loop task */ + goto done; + case GST_EVENT_FLUSH_STOP: + /* forward event */ + gst_pad_event_default (sinkpad, event); + + /* clear state and resume */ + GST_MPLEX_MUTEX_LOCK (mplex); + gst_adapter_clear (mpad->adapter); + mplex->srcresult = GST_FLOW_OK; + GST_MPLEX_MUTEX_UNLOCK (mplex); + goto done; + case GST_EVENT_NEWSEGMENT: + /* eat segments; we make our own (byte)stream */ + gst_event_unref (event); + goto done; + case GST_EVENT_EOS: + /* inform this pad that it can stop now */ + GST_MPLEX_MUTEX_LOCK (mplex); + mpad->eos = TRUE; + GST_MPLEX_SIGNAL (mplex, mpad); + GST_MPLEX_MUTEX_UNLOCK (mplex); + + /* eat this event for now, task will send eos when finished */ + gst_event_unref (event); + goto done; + default: + /* for a serialized event, wait until earlier data is gone, + * though this is no guarantee as to when task is done with it. + * Only wait if loop has been started already */ + if (GST_EVENT_IS_SERIALIZED (event)) { + GST_MPLEX_MUTEX_LOCK (mplex); + while (mplex->srcresult == GST_FLOW_OK && !mpad->needed) + GST_MPLEX_WAIT (mplex, mpad); + GST_MPLEX_MUTEX_UNLOCK (mplex); + } + break; + } + + result = gst_pad_event_default (sinkpad, event); + +done: + return result; +} + +/* starts task if conditions are right for it + * must be called with mutex_lock held */ +static void +gst_mplex_start_task (GstMplex * mplex) +{ + /* start task to create multiplexor and start muxing */ + if (G_UNLIKELY (mplex->srcresult == GST_FLOW_CUSTOM_SUCCESS) + && mplex->job->video_tracks == mplex->num_vpads + && mplex->job->audio_tracks == mplex->num_apads) { + gst_pad_start_task (mplex->srcpad, (GstTaskFunction) gst_mplex_loop, mplex); + mplex->srcresult = GST_FLOW_OK; + } +} + +static GstFlowReturn +gst_mplex_chain (GstPad * sinkpad, GstBuffer * buffer) +{ + GstMplex *mplex; + GstMplexPad *mpad; + + mplex = (GstMplex *) (GST_PAD_PARENT (sinkpad)); + mpad = (GstMplexPad *) gst_pad_get_element_private (sinkpad); + g_return_val_if_fail (mpad, GST_FLOW_ERROR); + + /* check if pad were properly negotiated and set up */ + if (G_UNLIKELY (!mpad->bs)) { + GST_ELEMENT_ERROR (mplex, CORE, NEGOTIATION, (NULL), + ("input pad has not been set up prior to chain function")); + return GST_FLOW_NOT_NEGOTIATED; + } + + GST_MPLEX_MUTEX_LOCK (mplex); + + gst_mplex_start_task (mplex); + + if (G_UNLIKELY (mpad->eos)) + goto eos; + + if (G_UNLIKELY (!GST_FLOW_IS_SUCCESS (mplex->srcresult))) + goto ignore; + + gst_adapter_push (mpad->adapter, buffer); + buffer = NULL; + while (gst_adapter_available (mpad->adapter) >= mpad->needed) { + GST_MPLEX_SIGNAL (mplex, mpad); + GST_MPLEX_WAIT (mplex, mpad); + /* may have become flushing or in error */ + if (G_UNLIKELY (mplex->srcresult != GST_FLOW_OK)) + goto ignore; + /* or been removed */ + if (G_UNLIKELY (mpad->eos)) + goto eos; + } + + GST_MPLEX_MUTEX_UNLOCK (mplex); + + return GST_FLOW_OK; + +/* special cases */ +eos: + { + GST_DEBUG_OBJECT (mplex, "ignoring buffer at end-of-stream"); + GST_MPLEX_MUTEX_UNLOCK (mplex); + + gst_buffer_unref (buffer); + return GST_FLOW_UNEXPECTED; + } +ignore: + { + GstFlowReturn ret = mplex->srcresult; + + GST_DEBUG_OBJECT (mplex, "ignoring buffer because src task encountered %s", + gst_flow_get_name (ret)); + GST_MPLEX_MUTEX_UNLOCK (mplex); + + if (buffer) + gst_buffer_unref (buffer); + return ret; + } } static GstPad * @@ -307,24 +609,72 @@ gst_mplex_request_new_pad (GstElement * element, GstMplex *mplex = GST_MPLEX (element); gchar *padname; GstPad *newpad; + GstMplexPad *mpad; if (templ == gst_element_class_get_pad_template (klass, "audio_%d")) { + GST_DEBUG_OBJECT (mplex, "request pad audio %d", mplex->num_apads); padname = g_strdup_printf ("audio_%d", mplex->num_apads++); } else if (templ == gst_element_class_get_pad_template (klass, "video_%d")) { + GST_DEBUG_OBJECT (mplex, "request pad video %d", mplex->num_vpads); padname = g_strdup_printf ("video_%d", mplex->num_vpads++); } else { - g_warning ("mplex: this is not our template!"); + GST_WARNING_OBJECT (mplex, "This is not our template!"); return NULL; } newpad = gst_pad_new_from_template (templ, padname); - gst_pad_set_link_function (newpad, gst_mplex_sink_link); - gst_element_add_pad (element, newpad); g_free (padname); + mpad = g_new0 (GstMplexPad, 1); + mpad->adapter = gst_adapter_new (); + mpad->cond = g_cond_new (); + gst_object_ref (newpad); + mpad->pad = newpad; + + gst_pad_set_setcaps_function (newpad, GST_DEBUG_FUNCPTR (gst_mplex_setcaps)); + gst_pad_set_chain_function (newpad, GST_DEBUG_FUNCPTR (gst_mplex_chain)); + gst_pad_set_event_function (newpad, GST_DEBUG_FUNCPTR (gst_mplex_sink_event)); + gst_pad_set_element_private (newpad, mpad); + gst_element_add_pad (element, newpad); + mplex->pads = g_slist_append (mplex->pads, mpad); + return newpad; } +static void +gst_mplex_release_pad (GstElement * element, GstPad * pad) +{ + GstMplex *mplex = GST_MPLEX (element); + GstMplexPad *mpad; + + g_return_if_fail (pad); + mpad = (GstMplexPad *) gst_pad_get_element_private (pad); + g_return_if_fail (mpad); + + if (gst_element_remove_pad (element, pad)) { + gchar *padname; + + GST_MPLEX_MUTEX_LOCK (mplex); + mpad->eos = TRUE; + gst_object_unref (mpad->pad); + mpad->pad = NULL; + /* wake up if waiting on this pad */ + GST_MPLEX_SIGNAL (mplex, mpad); + + padname = gst_object_get_name (GST_OBJECT (pad)); + if (strstr (padname, "audio")) { + mplex->num_apads--; + } else { + mplex->num_vpads--; + } + g_free (padname); + + /* may now be up to us to get things going */ + gst_mplex_start_task (mplex); + GST_MPLEX_MUTEX_UNLOCK (mplex); + } +} + static void gst_mplex_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) @@ -339,33 +689,118 @@ gst_mplex_set_property (GObject * object, GST_MPLEX (object)->job->setProperty (prop_id, value); } +static gboolean +gst_mplex_src_activate_push (GstPad * pad, gboolean active) +{ + gboolean result = TRUE; + GstMplex *mplex; + + mplex = GST_MPLEX (GST_PAD_PARENT (pad)); + + if (active) { + /* chain will start task once all streams have been setup */ + } else { + /* end the muxing loop by forcing eos and unblock chains */ + GST_MPLEX_MUTEX_LOCK (mplex); + mplex->eos = TRUE; + mplex->srcresult = GST_FLOW_WRONG_STATE; + GST_MPLEX_SIGNAL_ALL (mplex); + GST_MPLEX_MUTEX_UNLOCK (mplex); + + /* muxing loop should have ended now and can be joined */ + result = gst_pad_stop_task (pad); + } + + return result; +} + static GstStateChangeReturn gst_mplex_change_state (GstElement * element, GstStateChange transition) { GstMplex *mplex = GST_MPLEX (element); + GstStateChangeReturn ret; + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + break; + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + break; + default: + break; + } + + ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); + if (ret == GST_STATE_CHANGE_FAILURE) + goto done; switch (transition) { case GST_STATE_CHANGE_PAUSED_TO_READY: - delete mplex->mux; - mplex->mux = NULL; - mplex->num_apads = 0; - mplex->num_vpads = 0; + gst_mplex_reset (mplex); break; default: break; } - if (parent_class->change_state) - return parent_class->change_state (element, transition); +done: + return ret; +} + +#ifndef GST_DISABLE_GST_DEBUG + +static mjpeg_log_handler_t old_handler = NULL; + +/* note that this will affect all mjpegtools elements/threads */ +static void +gst_mplex_log_callback (log_level_t level, const char *message) +{ + GstDebugLevel gst_level; + +#if GST_MJPEGTOOLS_API >= 10903 + static const gint mjpeg_log_error = mjpeg_loglev_t ("error"); + static const gint mjpeg_log_warn = mjpeg_loglev_t ("warn"); + static const gint mjpeg_log_info = mjpeg_loglev_t ("info"); + static const gint mjpeg_log_debug = mjpeg_loglev_t ("debug"); +#else + static const gint mjpeg_log_error = LOG_ERROR; + static const gint mjpeg_log_warn = LOG_WARN; + static const gint mjpeg_log_info = LOG_INFO; + static const gint mjpeg_log_debug = LOG_DEBUG; +#endif - return GST_STATE_CHANGE_SUCCESS; + if (level == mjpeg_log_error) { + gst_level = GST_LEVEL_ERROR; + } else if (level == mjpeg_log_warn) { + gst_level = GST_LEVEL_WARNING; + } else if (level == mjpeg_log_info) { + gst_level = GST_LEVEL_INFO; + } else if (level == mjpeg_log_debug) { + gst_level = GST_LEVEL_DEBUG; + } else { + gst_level = GST_LEVEL_INFO; + } + + /* message could have a % in it, do not segfault in such case */ + gst_debug_log (mplex_debug, gst_level, "", "", 0, NULL, "%s", message); + + /* chain up to the old handler; + * this could actually be a handler from another mjpegtools based + * gstreamer element; in which case messages can come out double or from + * the wrong element ... */ + old_handler (level, message); } +#endif static gboolean plugin_init (GstPlugin * plugin) { - if (!gst_library_load ("gstbytestream")) - return FALSE; +#ifndef GST_DISABLE_GST_DEBUG + old_handler = mjpeg_log_set_handler (gst_mplex_log_callback); + g_assert (old_handler != NULL); +#endif + /* in any case, we do not want default handler output */ + mjpeg_default_handler_verbosity (0); return gst_element_register (plugin, "mplex", GST_RANK_NONE, GST_TYPE_MPLEX); } @@ -374,4 +809,4 @@ GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, GST_VERSION_MINOR, "mplex", "High-quality MPEG/DVD/SVCD/VCD video/audio multiplexer", - plugin_init, VERSION, "GPL", GST_PACKAGE, GST_ORIGIN) + plugin_init, VERSION, "GPL", GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN) diff --git a/ext/mplex/gstmplex.hh b/ext/mplex/gstmplex.hh index e8497ed62..c519f1dc3 100644 --- a/ext/mplex/gstmplex.hh +++ b/ext/mplex/gstmplex.hh @@ -23,7 +23,9 @@ #define __GST_MPLEX_H__ #include +#include #include +#include "gstmplexibitstream.hh" #include "gstmplexjob.hh" G_BEGIN_DECLS @@ -39,18 +41,74 @@ G_BEGIN_DECLS #define GST_IS_MPLEX_CLASS(obj) \ (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_MPLEX)) +GST_DEBUG_CATEGORY_EXTERN (mplex_debug); +#define GST_CAT_DEFAULT mplex_debug + +#define GST_MPLEX_MUTEX_LOCK(m) G_STMT_START { \ + GST_LOG_OBJECT (m, "locking tlock from thread %p", g_thread_self ()); \ + g_mutex_lock ((m)->tlock); \ + GST_LOG_OBJECT (m, "locked tlock from thread %p", g_thread_self ()); \ +} G_STMT_END + +#define GST_MPLEX_MUTEX_UNLOCK(m) G_STMT_START { \ + GST_LOG_OBJECT (m, "unlocking tlock from thread %p", g_thread_self ()); \ + g_mutex_unlock ((m)->tlock); \ +} G_STMT_END + +#define GST_MPLEX_WAIT(m, p) G_STMT_START { \ + GST_LOG_OBJECT (m, "thread %p waiting", g_thread_self ()); \ + g_cond_wait ((p)->cond, (m)->tlock); \ +} G_STMT_END + +#define GST_MPLEX_SIGNAL(m, p) G_STMT_START { \ + GST_LOG_OBJECT (m, "signalling from thread %p", g_thread_self ()); \ + g_cond_signal ((p)->cond); \ +} G_STMT_END + +#define GST_MPLEX_SIGNAL_ALL(m) G_STMT_START { \ + GST_LOG_OBJECT (m, "signalling all from thread %p", g_thread_self ()); \ + GSList *walk = m->pads; \ + while (walk) { \ + GST_MPLEX_SIGNAL (m, (GstMplexPad *) walk->data); \ + walk = walk->next; \ + } \ +} G_STMT_END + +typedef struct _GstMplexPad +{ + /* associated pad */ + GstPad *pad; + /* with mplex TLOCK */ + /* adapter collecting buffers for this pad */ + GstAdapter *adapter; + /* no more to expect on this pad */ + gboolean eos; + /* signals counterpart thread to have a look */ + GCond *cond; + /* amount needed by mplex on this stream */ + guint needed; + /* bitstream for this pad */ + GstMplexIBitStream *bs; +} GstMplexPad; + typedef struct _GstMplex { GstElement parent; /* pads */ - GstPad *sinkpad, *srcpad; + GSList *pads; + GstPad *srcpad; guint num_apads, num_vpads; /* options wrapper */ GstMplexJob *job; - /* general muxing object (contains rest) */ - Multiplexor *mux; + /* lock for syncing */ + GMutex *tlock; + /* with TLOCK */ + /* muxer writer generated eos */ + gboolean eos; + /* flowreturn obtained by muxer task */ + GstFlowReturn srcresult; } GstMplex; typedef struct _GstMplexClass { diff --git a/ext/mplex/gstmplexibitstream.cc b/ext/mplex/gstmplexibitstream.cc index 0f517703a..4249aea31 100644 --- a/ext/mplex/gstmplexibitstream.cc +++ b/ext/mplex/gstmplexibitstream.cc @@ -1,5 +1,6 @@ /* GStreamer mplex (mjpegtools) wrapper * (c) 2003 Ronald Bultje + * (c) 2008 Mark Nauwelaerts * * gstmplexibitstream.hh: gstreamer/mplex input bitstream wrapper * @@ -25,41 +26,23 @@ #include +#include "gstmplex.hh" #include "gstmplexibitstream.hh" /* * Class init/exit functions. */ -GstMplexIBitStream::GstMplexIBitStream (GstPad * _pad, guint buf_size): +GstMplexIBitStream::GstMplexIBitStream (GstMplexPad * _data, guint buf_size): IBitStream () { - guint8 *data; - - pad = _pad; - bs = gst_bytestream_new (pad); + mpad = _data; + mplex = GST_MPLEX (GST_PAD_PARENT (mpad->pad)); eos = FALSE; - streamname = g_strdup (gst_pad_get_name (_pad)); - SetBufSize (buf_size); eobs = false; byteidx = 0; - - /* we peek 1 byte (not even caring about the return value) so we - * are sure that we have data and thus capsnego must be completed - * when we return. */ - gst_bytestream_peek_bytes (bs, &data, 1); - - if (!ReadIntoBuffer () && buffered == 0) { - GST_ELEMENT_ERROR (gst_pad_get_parent (_pad), RESOURCE, READ, (NULL), - ("Failed to read from input pad %s", gst_pad_get_name (pad))); - } -} - -GstMplexIBitStream::~GstMplexIBitStream (void) -{ - gst_bytestream_destroy (bs); } /* @@ -67,47 +50,45 @@ GstMplexIBitStream::~GstMplexIBitStream (void) */ size_t -GstMplexIBitStream::ReadStreamBytes (uint8_t * buf, size_t size) + GstMplexIBitStream::ReadStreamBytes (uint8_t * buf, size_t size = + BUFFER_SIZE) { guint8 *data; - guint read = 0; - - if (eos) - return 0; + GST_MPLEX_MUTEX_LOCK (mplex); - while (!eos && (read = gst_bytestream_peek_bytes (bs, &data, size)) != size) { - GstEvent *event; + GST_DEBUG_OBJECT (mplex, "needing %d bytes", (guint) size); - guint pending; - - gst_bytestream_get_status (bs, &pending, &event); - if (event) { - switch (GST_EVENT_TYPE (event)) { - case GST_EVENT_EOS: - eos = TRUE; - break; - default: - break; - } - gst_event_unref (event); - } + while (gst_adapter_available (mpad->adapter) < size + && !mplex->eos && !mpad->eos) { + mpad->needed = size; + GST_MPLEX_SIGNAL (mplex, mpad); + GST_MPLEX_WAIT (mplex, mpad); } - if (read > 0) { - memcpy (buf, data, read); - gst_bytestream_flush_fast (bs, read); + mpad->needed = 0; + size = MIN (size, gst_adapter_available (mpad->adapter)); + if (size) { + data = gst_adapter_take (mpad->adapter, size); + memcpy (buf, data, size); + g_free (data); } - return read; + GST_MPLEX_MUTEX_UNLOCK (mplex); + + return size; } /* * Are we at EOS? */ -bool -GstMplexIBitStream::EndOfStream (void) +bool GstMplexIBitStream::EndOfStream (void) { return eos; } + +bool GstMplexIBitStream::ReadBuffer () +{ + return ReadIntoBuffer (BUFFER_SIZE); +} diff --git a/ext/mplex/gstmplexibitstream.hh b/ext/mplex/gstmplexibitstream.hh index 504835a4a..533160d40 100644 --- a/ext/mplex/gstmplexibitstream.hh +++ b/ext/mplex/gstmplexibitstream.hh @@ -1,5 +1,6 @@ /* GStreamer mplex (mjpegtools) wrapper * (c) 2003 Ronald Bultje + * (c) 2008 Mark Nauwelaerts * * gstmplexibitstream.hh: gstreamer/mplex input bitstream wrapper * @@ -23,27 +24,30 @@ #define __GST_MPLEXIBITSTREAM_H__ #include -#include #include #include +#include "gstmplex.hh" + +/* forward declaration; break circular referencing */ +typedef struct _GstMplex GstMplex; +typedef struct _GstMplexPad GstMplexPad; + class GstMplexIBitStream : public IBitStream { public: - GstMplexIBitStream (GstPad *pad, - guint buf_size = BUFFER_SIZE); - ~GstMplexIBitStream (void); + GstMplexIBitStream (GstMplexPad *pad, guint buf_size = BUFFER_SIZE); + bool ReadBuffer (); protected: /* read data */ - size_t ReadStreamBytes (uint8_t *buf, - size_t number); + size_t ReadStreamBytes (uint8_t *buf, size_t number); /* are we at EOS? */ bool EndOfStream (void); private: - GstPad *pad; - GstByteStream *bs; + GstMplex *mplex; + GstMplexPad *mpad; gboolean eos; }; diff --git a/ext/mplex/gstmplexjob.cc b/ext/mplex/gstmplexjob.cc index f13aa36a0..33597c77b 100644 --- a/ext/mplex/gstmplexjob.cc +++ b/ext/mplex/gstmplexjob.cc @@ -36,7 +36,8 @@ enum ARG_SPLIT_SEQUENCE, ARG_SEGMENT_SIZE, ARG_PACKETS_PER_PACK, - ARG_SECTOR_SIZE + ARG_SECTOR_SIZE, + ARG_BUFSIZE /* FILL ME */ }; @@ -54,16 +55,16 @@ gst_mplex_format_get_type (void) if (!mplex_format_type) { static const GEnumValue mplex_formats[] = { - {0, "0", "Generic MPEG-1"}, - {1, "1", "Standard VCD"}, - {2, "2", "User VCD"}, - {3, "3", "Generic MPEG-2"}, - {4, "4", "Standard SVCD"}, - {5, "5", "User SVCD"}, - {6, "6", "VCD Stills sequences"}, - {7, "7", "SVCD Stills sequences"}, - {8, "8", "DVD MPEG-2 for dvdauthor"}, - {9, "9", "DVD MPEG-2"}, + {0, "Generic MPEG-1", "0"}, + {1, "Standard VCD", "1"}, + {2, "User VCD", "2"}, + {3, "Generic MPEG-2", "3"}, + {4, "Standard SVCD", "4"}, + {5, "User SVCD", "5"}, + {6, "VCD Stills sequences", "6"}, + {7, "SVCD Stills sequences", "7"}, + {8, "DVD MPEG-2 for dvdauthor", "8"}, + {9, "DVD MPEG-2", "9"}, {0, NULL, NULL}, }; @@ -82,6 +83,7 @@ GstMplexJob::GstMplexJob (void): MultiplexJob () { /* blabla */ + bufsize = 0; } /* @@ -104,13 +106,15 @@ GstMplexJob::initProperties (GObjectClass * klass) "Bitrate of output stream in kbps (0 = autodetect)", 0, 15 * 1024, 0, (GParamFlags) G_PARAM_READWRITE)); -#if 0 - { - "video-buffer", 1, 0, 'b'} - , -#endif - /* some boolean stuff for headers */ - g_object_class_install_property (klass, ARG_VBR, + /* override decode buffer size otherwise determined by format */ + g_object_class_install_property (klass, ARG_BUFSIZE, + g_param_spec_int ("bufsize", "Decoder buf. size", + "Target decoders video buffer size (kB) " + "[default determined by format if not explicitly set]", + 20, 4000, 46, (GParamFlags) G_PARAM_READWRITE)); + + /* some boolean stuff for headers */ + g_object_class_install_property (klass, ARG_VBR, g_param_spec_boolean ("vbr", "VBR", "Whether the input video stream is variable bitrate", FALSE, (GParamFlags) G_PARAM_READWRITE)); @@ -118,17 +122,19 @@ GstMplexJob::initProperties (GObjectClass * klass) g_param_spec_boolean ("system-headers", "System headers", "Create system header in every pack for generic formats", FALSE, (GParamFlags) G_PARAM_READWRITE)); +#if 0 /* not supported */ g_object_class_install_property (klass, ARG_SPLIT_SEQUENCE, g_param_spec_boolean ("split-sequence", "Split sequence", "Simply split a sequence across files " "(rather than building run-out/run-in)", FALSE, (GParamFlags) G_PARAM_READWRITE)); - /* size of a segment (followed by EOS) */ + /* size of a segment */ g_object_class_install_property (klass, ARG_SEGMENT_SIZE, g_param_spec_int ("max-segment-size", "Max. segment size", "Max. size per segment/file in MB (0 = unlimited)", 0, 10 * 1024, 0, (GParamFlags) G_PARAM_READWRITE)); +#endif /* packets per pack (generic formats) */ g_object_class_install_property (klass, ARG_PACKETS_PER_PACK, @@ -155,7 +161,8 @@ GstMplexJob::getProperty (guint prop_id, GValue * value) g_value_set_enum (value, mux_format); break; case ARG_MUX_BITRATE: - g_value_set_int (value, data_rate / 1000); + /* convert from bytes back to bits */ + g_value_set_int (value, (data_rate * 8) / 1000); break; case ARG_VBR: g_value_set_boolean (value, VBR); @@ -175,6 +182,9 @@ GstMplexJob::getProperty (guint prop_id, GValue * value) case ARG_SECTOR_SIZE: g_value_set_int (value, sector_size); break; + case ARG_BUFSIZE: + g_value_set_int (value, bufsize); + break; default: break; } @@ -211,6 +221,9 @@ GstMplexJob::setProperty (guint prop_id, const GValue * value) case ARG_SECTOR_SIZE: sector_size = g_value_get_int (value); break; + case ARG_BUFSIZE: + bufsize = g_value_get_int (value); + break; default: break; } diff --git a/ext/mplex/gstmplexjob.hh b/ext/mplex/gstmplexjob.hh index 8aa5b14dc..8e9607046 100644 --- a/ext/mplex/gstmplexjob.hh +++ b/ext/mplex/gstmplexjob.hh @@ -37,6 +37,8 @@ public: GValue *value); void setProperty (guint prop_id, const GValue *value); + + int bufsize; }; #endif /* __GST_MPLEXJOB_H__ */ diff --git a/ext/mplex/gstmplexoutputstream.cc b/ext/mplex/gstmplexoutputstream.cc index 206c13ec7..5a0cc5ae9 100644 --- a/ext/mplex/gstmplexoutputstream.cc +++ b/ext/mplex/gstmplexoutputstream.cc @@ -1,5 +1,6 @@ /* GStreamer mplex (mjpegtools) wrapper * (c) 2003 Ronald Bultje + * (c) 2008 Mark Nauwelaerts * * gstmplexoutputstream.hh: gstreamer/mplex output stream wrapper * @@ -25,16 +26,17 @@ #include +#include "gstmplex.hh" #include "gstmplexoutputstream.hh" /* * Class init functions. */ -GstMplexOutputStream::GstMplexOutputStream (GstElement * _element, GstPad * _pad): +GstMplexOutputStream::GstMplexOutputStream (GstMplex * _element, GstPad * _pad): OutputStream () { - element = _element; + mplex = _element; pad = _pad; size = 0; } @@ -54,21 +56,32 @@ GstMplexOutputStream::Open (void) void GstMplexOutputStream::Close (void) { - gst_pad_push (pad, GST_DATA (gst_event_new (GST_EVENT_EOS))); - gst_element_set_eos (element); + GST_MPLEX_MUTEX_LOCK (mplex); + GST_DEBUG_OBJECT (mplex, "closing stream and sending eos"); + gst_pad_push_event (pad, gst_event_new_eos ()); + /* notify chain there is no more need to supply buffers */ + mplex->eos = TRUE; + GST_MPLEX_SIGNAL_ALL (mplex); + GST_MPLEX_MUTEX_UNLOCK (mplex); } /* * Get size of current segment. */ -off_t GstMplexOutputStream::SegmentSize (void) +#if GST_MJPEGTOOLS_API >= 10900 +uint64_t +GstMplexOutputStream::SegmentSize (void) +#else +off_t +GstMplexOutputStream::SegmentSize (void) +#endif { return size; } /* - * Next segment. + * Next segment; not really supported. */ void @@ -76,9 +89,8 @@ GstMplexOutputStream::NextSegment (void) { size = 0; - /* send EOS. The filesink (or whatever) handles that - * and opens a new file. */ - gst_pad_push (pad, GST_DATA (gst_event_new (GST_EVENT_EOS))); + GST_WARNING_OBJECT (mplex, "multiple file output is not supported"); + /* FIXME: no such filesink behaviour to be expected */ } /* @@ -94,5 +106,8 @@ GstMplexOutputStream::Write (guint8 * data, guint len) memcpy (GST_BUFFER_DATA (buf), data, len); size += len; - gst_pad_push (pad, GST_DATA (buf)); + GST_MPLEX_MUTEX_LOCK (mplex); + gst_buffer_set_caps (buf, GST_PAD_CAPS (pad)); + mplex->srcresult = gst_pad_push (pad, buf); + GST_MPLEX_MUTEX_UNLOCK (mplex); } diff --git a/ext/mplex/gstmplexoutputstream.hh b/ext/mplex/gstmplexoutputstream.hh index c67040b7d..f93c2f951 100644 --- a/ext/mplex/gstmplexoutputstream.hh +++ b/ext/mplex/gstmplexoutputstream.hh @@ -1,5 +1,6 @@ /* GStreamer mplex (mjpegtools) wrapper * (c) 2003 Ronald Bultje + * (c) 2008 Mark Nauwelaerts * * gstmplexoutputstream.hh: gstreamer/mplex output stream wrapper * @@ -26,27 +27,31 @@ #include #include +#include "gstmplex.hh" + class GstMplexOutputStream : public OutputStream { public: - GstMplexOutputStream (GstElement *element, - GstPad *pad); + GstMplexOutputStream (GstMplex *element, GstPad *pad); /* open/close. Basically 'no-op's (close() sets EOS). */ int Open (void); void Close (void); /* get size of current segment */ +#if GST_MJPEGTOOLS_API >= 10900 + uint64_t SegmentSize (void); +#else off_t SegmentSize (void); +#endif /* next segment */ void NextSegment (void); /* write data */ - void Write (guint8 *data, - guint len); + void Write (guint8 *data, guint len); private: - GstElement *element; + GstMplex *mplex; GstPad *pad; guint64 size; }; -- cgit v1.2.1