From 86be138973c5ed441b9a9ec87833de430d2a71dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Cerveau?= Date: Fri, 24 Apr 2020 16:15:42 +0200 Subject: openjpegdec: support for a multithreaded decoding. Part-of: --- ext/openjpeg/gstopenjpeg.h | 2 + ext/openjpeg/gstopenjpegdec.c | 727 ++++++++++++++++++++++++++++++++++-------- ext/openjpeg/gstopenjpegdec.h | 23 +- 3 files changed, 617 insertions(+), 135 deletions(-) (limited to 'ext') diff --git a/ext/openjpeg/gstopenjpeg.h b/ext/openjpeg/gstopenjpeg.h index 275b8dcf6..be591463c 100644 --- a/ext/openjpeg/gstopenjpeg.h +++ b/ext/openjpeg/gstopenjpeg.h @@ -41,9 +41,11 @@ typedef struct { GstVideoCodecFrame *frame; GstBuffer *output_buffer; + GstBuffer *input_buffer; gint stripe; OpenJPEGErrorCode last_error; gboolean direct; + gboolean last_subframe; } GstOpenJPEGCodecMessage; #endif /* __GST_OPENJPEG_H__ */ diff --git a/ext/openjpeg/gstopenjpegdec.c b/ext/openjpeg/gstopenjpegdec.c index 6ab7f0d41..5ceb1ef5e 100644 --- a/ext/openjpeg/gstopenjpegdec.c +++ b/ext/openjpeg/gstopenjpegdec.c @@ -20,6 +20,23 @@ * */ +/** + * SECTION:element-openjpegdec + * @title: openjpegdec + * @see_also: openjpegenc + * + * openjpegdec decodes openjpeg stream. + * + * ## Example launch lines + * |[ + * gst-launch-1.0 -v videotestsrc num-buffers=10 ! openjpegenc ! jpeg2000parse ! openjpegdec ! videoconvert ! autovideosink sync=false + * ]| Encode and decode whole frames. + * |[ + * gst-launch-1.0 -v videotestsrc num-buffers=10 ! openjpegenc num-threads=8 num-stripes=8 ! jpeg2000parse ! openjpegdec max-slice-threads=8 ! videoconvert ! autovideosink sync=fals + * ]| Encode and decode frame split with stripes. + * + */ + #ifdef HAVE_CONFIG_H #include "config.h" #endif @@ -36,15 +53,24 @@ enum { PROP_0, PROP_MAX_THREADS, + PROP_MAX_SLICE_THREADS, PROP_LAST }; #define GST_OPENJPEG_DEC_DEFAULT_MAX_THREADS 0 +/* prototypes */ +static void gst_openjpeg_dec_finalize (GObject * object); + +static GstStateChangeReturn +gst_openjpeg_dec_change_state (GstElement * element, GstStateChange transition); + static gboolean gst_openjpeg_dec_start (GstVideoDecoder * decoder); static gboolean gst_openjpeg_dec_stop (GstVideoDecoder * decoder); static gboolean gst_openjpeg_dec_set_format (GstVideoDecoder * decoder, GstVideoCodecState * state); +static gboolean gst_openjpeg_dec_flush (GstVideoDecoder * decoder); +static gboolean gst_openjpeg_dec_finish (GstVideoDecoder * decoder); static GstFlowReturn gst_openjpeg_dec_handle_frame (GstVideoDecoder * decoder, GstVideoCodecFrame * frame); static gboolean gst_openjpeg_dec_decide_allocation (GstVideoDecoder * decoder, @@ -54,6 +80,13 @@ static void gst_openjpeg_dec_set_property (GObject * object, static void gst_openjpeg_dec_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); +static gboolean gst_openjpeg_dec_decode_frame_multiple (GstVideoDecoder * + decoder, GstVideoCodecFrame * frame); +static gboolean gst_openjpeg_dec_decode_frame_single (GstVideoDecoder * decoder, + GstVideoCodecFrame * frame); + +static void gst_openjpeg_dec_pause_loop (GstOpenJPEGDec * self, + GstFlowReturn flow_ret); #if G_BYTE_ORDER == G_LITTLE_ENDIAN #define GRAY16 "GRAY16_LE" @@ -110,8 +143,13 @@ gst_openjpeg_dec_class_init (GstOpenJPEGDecClass * klass) "Decode JPEG2000 streams", "Sebastian Dröge "); + element_class->change_state = + GST_DEBUG_FUNCPTR (gst_openjpeg_dec_change_state); + video_decoder_class->start = GST_DEBUG_FUNCPTR (gst_openjpeg_dec_start); video_decoder_class->stop = GST_DEBUG_FUNCPTR (gst_openjpeg_dec_stop); + video_decoder_class->flush = GST_DEBUG_FUNCPTR (gst_openjpeg_dec_flush); + video_decoder_class->finish = GST_DEBUG_FUNCPTR (gst_openjpeg_dec_finish); video_decoder_class->set_format = GST_DEBUG_FUNCPTR (gst_openjpeg_dec_set_format); video_decoder_class->handle_frame = @@ -119,17 +157,31 @@ gst_openjpeg_dec_class_init (GstOpenJPEGDecClass * klass) video_decoder_class->decide_allocation = gst_openjpeg_dec_decide_allocation; gobject_class->set_property = gst_openjpeg_dec_set_property; gobject_class->get_property = gst_openjpeg_dec_get_property; + gobject_class->finalize = gst_openjpeg_dec_finalize; /** - * GstOpenJPEGDec:max-threads: + * GstOpenJPEGDec:max-slice-threads: * * Maximum number of worker threads to spawn. (0 = auto) * + * Since: 1.20 + */ + g_object_class_install_property (G_OBJECT_CLASS (klass), + PROP_MAX_SLICE_THREADS, g_param_spec_int ("max-slice-threads", + "Maximum slice decoding threads", + "Maximum number of worker threads to spawn according to the frame boundary. (0 = no thread)", + 0, G_MAXINT, GST_OPENJPEG_DEC_DEFAULT_MAX_THREADS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstOpenJPEGDec:max-threads: + * + * Maximum number of worker threads to spawn used by openjpeg internally. (0 = no thread) + * * Since: 1.18 */ g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_MAX_THREADS, - g_param_spec_int ("max-threads", "Maximum decode threads", - "Maximum number of worker threads to spawn. (0 = auto)", + g_param_spec_int ("max-threads", "Maximum openjpeg threads", + "Maximum number of worker threads to spawn used by openjpeg internally. (0 = no thread)", 0, G_MAXINT, GST_OPENJPEG_DEC_DEFAULT_MAX_THREADS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); @@ -149,8 +201,13 @@ gst_openjpeg_dec_init (GstOpenJPEGDec * self) GST_PAD_SET_ACCEPT_TEMPLATE (GST_VIDEO_DECODER_SINK_PAD (self)); opj_set_default_decoder_parameters (&self->params); self->sampling = GST_JPEG2000_SAMPLING_NONE; - self->max_threads = GST_OPENJPEG_DEC_DEFAULT_MAX_THREADS; + self->max_slice_threads = GST_OPENJPEG_DEC_DEFAULT_MAX_THREADS; + self->available_threads = GST_OPENJPEG_DEC_DEFAULT_MAX_THREADS; self->num_procs = g_get_num_processors (); + g_mutex_init (&self->messages_lock); + g_mutex_init (&self->decoding_lock); + g_cond_init (&self->messages_cond); + g_queue_init (&self->messages); } static gboolean @@ -159,6 +216,11 @@ gst_openjpeg_dec_start (GstVideoDecoder * decoder) GstOpenJPEGDec *self = GST_OPENJPEG_DEC (decoder); GST_DEBUG_OBJECT (self, "Starting"); + self->available_threads = self->max_slice_threads; + if (self->available_threads) + self->decode_frame = gst_openjpeg_dec_decode_frame_multiple; + else + self->decode_frame = gst_openjpeg_dec_decode_frame_single; return TRUE; } @@ -169,6 +231,8 @@ gst_openjpeg_dec_stop (GstVideoDecoder * video_decoder) GstOpenJPEGDec *self = GST_OPENJPEG_DEC (video_decoder); GST_DEBUG_OBJECT (self, "Stopping"); + g_mutex_lock (&self->messages_lock); + gst_pad_stop_task (GST_VIDEO_DECODER_SRC_PAD (video_decoder)); if (self->output_state) { gst_video_codec_state_unref (self->output_state); @@ -179,16 +243,77 @@ gst_openjpeg_dec_stop (GstVideoDecoder * video_decoder) gst_video_codec_state_unref (self->input_state); self->input_state = NULL; } - - if (self->current_frame) { - gst_video_codec_frame_unref (self->current_frame); - self->current_frame = NULL; - } + g_mutex_unlock (&self->messages_lock); GST_DEBUG_OBJECT (self, "Stopped"); return TRUE; } +static void +gst_openjpeg_dec_finalize (GObject * object) +{ + GstOpenJPEGDec *self = GST_OPENJPEG_DEC (object); + + g_mutex_clear (&self->messages_lock); + g_mutex_clear (&self->decoding_lock); + g_cond_clear (&self->messages_cond); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static GstStateChangeReturn +gst_openjpeg_dec_change_state (GstElement * element, GstStateChange transition) +{ + GstOpenJPEGDec *self; + GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS; + + g_return_val_if_fail (GST_IS_OPENJPEG_DEC (element), + GST_STATE_CHANGE_FAILURE); + self = GST_OPENJPEG_DEC (element); + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + self->draining = FALSE; + self->started = FALSE; + self->flushing = FALSE; + break; + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + break; + case GST_STATE_CHANGE_PAUSED_TO_READY: + self->flushing = TRUE; + g_mutex_lock (&self->drain_lock); + self->draining = FALSE; + g_cond_broadcast (&self->drain_cond); + g_mutex_unlock (&self->drain_lock); + break; + default: + break; + } + + ret = + GST_ELEMENT_CLASS (gst_openjpeg_dec_parent_class)->change_state + (element, transition); + + if (ret == GST_STATE_CHANGE_FAILURE) + return ret; + + switch (transition) { + case GST_STATE_CHANGE_PLAYING_TO_PAUSED: + break; + case GST_STATE_CHANGE_PAUSED_TO_READY: + self->started = FALSE; + self->downstream_flow_ret = GST_FLOW_FLUSHING; + break; + case GST_STATE_CHANGE_READY_TO_NULL: + break; + default: + break; + } + + return ret; +} static void gst_openjpeg_dec_set_property (GObject * object, @@ -197,6 +322,9 @@ gst_openjpeg_dec_set_property (GObject * object, GstOpenJPEGDec *dec = (GstOpenJPEGDec *) object; switch (prop_id) { + case PROP_MAX_SLICE_THREADS: + g_atomic_int_set (&dec->max_slice_threads, g_value_get_int (value)); + break; case PROP_MAX_THREADS: g_atomic_int_set (&dec->max_threads, g_value_get_int (value)); break; @@ -213,6 +341,9 @@ gst_openjpeg_dec_get_property (GObject * object, GstOpenJPEGDec *dec = (GstOpenJPEGDec *) object; switch (prop_id) { + case PROP_MAX_SLICE_THREADS: + g_value_set_int (value, g_atomic_int_get (&dec->max_slice_threads)); + break; case PROP_MAX_THREADS: g_value_set_int (value, g_atomic_int_get (&dec->max_threads)); break; @@ -313,6 +444,7 @@ fill_frame_packed8_4 (GstOpenJPEGDec * self, GstVideoFrame * frame, /* copy only the stripe content (image) to the full size frame */ y0 = image->y0; y1 = image->y1; + GST_DEBUG_OBJECT (self, "yo=%d y1=%d", y0, y1); data_out += y0 * dstride; for (y = y0; y < y1; y++) { tmp = data_out; @@ -841,6 +973,9 @@ gst_openjpeg_dec_negotiate (GstOpenJPEGDec * self, opj_image_t * image) if (image->color_space == OPJ_CLRSPC_UNKNOWN || image->color_space == 0) image->color_space = self->color_space; + if (!self->input_state) + return GST_FLOW_FLUSHING; + switch (image->color_space) { case OPJ_CLRSPC_SRGB: if (image->numcomps == 4) { @@ -1146,64 +1281,221 @@ seek_fn (OPJ_OFF_T p_nb_bytes, void *p_user_data) return OPJ_TRUE; } +static gboolean +gst_openjpeg_dec_is_last_input_subframe (GstVideoDecoder * dec, + GstOpenJPEGCodecMessage * message) +{ + GstOpenJPEGDec *self = GST_OPENJPEG_DEC (dec); + + return (message->last_subframe || message->stripe == self->num_stripes); +} + +static gboolean +gst_openjpeg_dec_is_last_output_subframe (GstVideoDecoder * dec, + GstOpenJPEGCodecMessage * message) +{ + GstOpenJPEGDec *self = GST_OPENJPEG_DEC (dec); + + return (gst_video_decoder_get_processed_subframe_index (dec, + message->frame) == (self->num_stripes - 1)); +} + + +static gboolean +gst_openjpeg_dec_has_pending_job_to_finish (GstOpenJPEGDec * self) +{ + gboolean res = FALSE; + if (self->downstream_flow_ret != GST_FLOW_OK) + return res; + g_mutex_lock (&self->messages_lock); + res = (!g_queue_is_empty (&self->messages) + || (self->available_threads < self->max_slice_threads)); + g_mutex_unlock (&self->messages_lock); + return res; +} + +static GstOpenJPEGCodecMessage * +gst_openjpeg_decode_message_new (GstOpenJPEGDec * self, + GstVideoCodecFrame * frame, int num_stripe) +{ + GstOpenJPEGCodecMessage *message = g_slice_new0 (GstOpenJPEGCodecMessage); + GST_DEBUG_OBJECT (self, "message: %p", message); + message->frame = gst_video_codec_frame_ref (frame); + message->stripe = num_stripe; + message->last_error = OPENJPEG_ERROR_NONE; + message->input_buffer = gst_buffer_ref (frame->input_buffer); + message->last_subframe = GST_BUFFER_FLAG_IS_SET (frame->input_buffer, + GST_BUFFER_FLAG_MARKER); + return message; +} + +static GstOpenJPEGCodecMessage * +gst_openjpeg_decode_message_free (GstOpenJPEGDec * self, + GstOpenJPEGCodecMessage * message) +{ + if (!message) + return message; + gst_buffer_unref (message->input_buffer); + gst_video_codec_frame_unref (message->frame); + GST_DEBUG_OBJECT (self, "message: %p", message); + g_slice_free (GstOpenJPEGCodecMessage, message); + return NULL; +} + +static GstOpenJPEGCodecMessage * +gst_openjpeg_dec_wait_for_new_message (GstOpenJPEGDec * self, gboolean dry_run) +{ + GstOpenJPEGCodecMessage *message = NULL; + g_mutex_lock (&self->messages_lock); + if (dry_run && self->available_threads == self->max_slice_threads) + goto done; + if (!g_queue_is_empty (&self->messages) && !dry_run) { + message = g_queue_pop_head (&self->messages); + } else { + g_cond_wait (&self->messages_cond, &self->messages_lock); + } + +done: + g_mutex_unlock (&self->messages_lock); + return message; +} + static void -gst_openjpeg_dec_handle_frame_cleanup (GstOpenJPEGDec * self, - GstVideoCodecFrame * frame, - GstMapInfo * map, - opj_codec_t * dec, opj_stream_t * stream, opj_image_t * image) +gst_openjpeg_dec_pause_loop (GstOpenJPEGDec * self, GstFlowReturn flow_ret) { - if (image) - opj_image_destroy (image); - if (stream) - opj_stream_destroy (stream); - if (dec) - opj_destroy_codec (dec); - if (frame) { - if (map) - gst_buffer_unmap (frame->input_buffer, map); - gst_video_codec_frame_unref (frame); + g_mutex_lock (&self->drain_lock); + GST_DEBUG_OBJECT (self, "Pause the loop draining %d flow_ret %s", + self->draining, gst_flow_get_name (flow_ret)); + if (self->draining) { + self->draining = FALSE; + g_cond_broadcast (&self->drain_cond); } + gst_pad_pause_task (GST_VIDEO_DECODER_SRC_PAD (self)); + self->downstream_flow_ret = flow_ret; + self->started = FALSE; + g_mutex_unlock (&self->drain_lock); } -static GstFlowReturn -gst_openjpeg_dec_handle_frame (GstVideoDecoder * decoder, - GstVideoCodecFrame * frame) +static void +gst_openjpeg_dec_loop (GstOpenJPEGDec * self) { - GstOpenJPEGDec *self = GST_OPENJPEG_DEC (decoder); - GstFlowReturn ret = GST_FLOW_OK; - gint64 deadline; + GstOpenJPEGCodecMessage *message; + GstVideoDecoder *decoder = GST_VIDEO_DECODER (self); + GstFlowReturn flow_ret = GST_FLOW_OK; + + message = gst_openjpeg_dec_wait_for_new_message (self, FALSE); + if (message) { + GST_DEBUG_OBJECT (self, + "received message for frame %p stripe %d last_error %d threads %d", + message->frame, message->stripe, message->last_error, + self->available_threads); + + if (self->flushing) + goto flushing; + + if (message->last_error != OPENJPEG_ERROR_NONE) + goto decode_error; + + g_mutex_lock (&self->decoding_lock); + + if (gst_openjpeg_dec_is_last_output_subframe (decoder, message)) + flow_ret = gst_video_decoder_finish_frame (decoder, message->frame); + else + gst_video_decoder_finish_subframe (decoder, message->frame); + g_mutex_unlock (&self->decoding_lock); + message = gst_openjpeg_decode_message_free (self, message); + g_cond_broadcast (&self->messages_cond); + } + + if (flow_ret != GST_FLOW_OK) + goto flow_error; + + if (self->draining && !gst_openjpeg_dec_has_pending_job_to_finish (self)) + gst_openjpeg_dec_pause_loop (self, GST_FLOW_OK); + + if (self->flushing) + goto flushing; + + return; + +decode_error: + { + GST_ELEMENT_ERROR (self, LIBRARY, FAILED, (NULL), + ("OPEN JPEG decode fail %d", message->last_error)); + gst_video_codec_frame_unref (message->frame); + gst_pad_push_event (GST_VIDEO_DECODER_SRC_PAD (self), gst_event_new_eos ()); + gst_openjpeg_dec_pause_loop (self, GST_FLOW_ERROR); + gst_openjpeg_decode_message_free (self, message); + return; + } + +flushing: + { + GST_DEBUG_OBJECT (self, "Flushing -- stopping task"); + if (message) { + gst_video_codec_frame_unref (message->frame); + gst_openjpeg_decode_message_free (self, message); + } + gst_openjpeg_dec_pause_loop (self, GST_FLOW_FLUSHING); + return; + } + +flow_error: + { + if (flow_ret == GST_FLOW_EOS) { + GST_DEBUG_OBJECT (self, "EOS"); + + gst_pad_push_event (GST_VIDEO_DECODER_SRC_PAD (self), + gst_event_new_eos ()); + } else if (flow_ret < GST_FLOW_EOS) { + GST_ELEMENT_ERROR (self, STREAM, FAILED, + ("Internal data stream error."), ("stream stopped, reason %s", + gst_flow_get_name (flow_ret))); + + gst_pad_push_event (GST_VIDEO_DECODER_SRC_PAD (self), + gst_event_new_eos ()); + } else if (flow_ret == GST_FLOW_FLUSHING) { + GST_DEBUG_OBJECT (self, "Flushing -- stopping task"); + } + gst_openjpeg_dec_pause_loop (self, flow_ret); + + return; + } + +} + +#define DECODE_ERROR(self, message, err_code, mutex_unlock) { \ + GST_WARNING_OBJECT(self, "An error occurred err_code=%d", err_code);\ + message->last_error = err_code; \ + if (mutex_unlock) \ + g_mutex_unlock (&self->decoding_lock);\ + goto done; \ +} + +static void +gst_openjpeg_dec_decode_stripe (GstElement * element, gpointer user_data) +{ + GstOpenJPEGDec *self = GST_OPENJPEG_DEC (element); + GstVideoDecoder *decoder = GST_VIDEO_DECODER (element); + GstOpenJPEGCodecMessage *message = (GstOpenJPEGCodecMessage *) user_data; GstMapInfo map; + GstVideoFrame vframe; opj_codec_t *dec = NULL; opj_stream_t *stream = NULL; MemStream mstream; opj_image_t *image = NULL; - GstVideoFrame vframe; opj_dparameters_t params; gint max_threads; - guint current_stripe = 1; - - current_stripe = gst_video_decoder_get_current_subframe_index (decoder); - - GST_DEBUG_OBJECT (self, "Handling frame with current stripe %d", - current_stripe); - deadline = gst_video_decoder_get_max_decode_time (decoder, frame); - if (self->drop_subframes || deadline < 0) { - GST_INFO_OBJECT (self, - "Dropping too late frame: deadline %" G_GINT64_FORMAT, deadline); - self->drop_subframes = TRUE; - if (current_stripe == self->num_stripes) { - ret = gst_video_decoder_drop_frame (decoder, frame); - self->drop_subframes = FALSE; - } else - gst_video_decoder_drop_subframe (decoder, frame); + GstFlowReturn ret; + gint i; - return ret; - } + GST_DEBUG_OBJECT (self, "Start to decode stripe %p %d", message->frame, + message->stripe); dec = opj_create_decompress (self->codec_format); if (!dec) - goto initialization_error; + DECODE_ERROR (self, message, OPENJPEG_ERROR_INIT, FALSE); if (G_UNLIKELY (gst_debug_category_get_threshold (GST_CAT_DEFAULT) >= GST_LEVEL_TRACE)) { @@ -1220,24 +1512,25 @@ gst_openjpeg_dec_handle_frame (GstVideoDecoder * decoder, if (self->ncomps) params.jpwl_exp_comps = self->ncomps; if (!opj_setup_decoder (dec, ¶ms)) - goto open_error; + DECODE_ERROR (self, message, OPENJPEG_ERROR_OPEN, FALSE); max_threads = g_atomic_int_get (&self->max_threads); - if (max_threads == 0) + if (max_threads > self->num_procs) max_threads = self->num_procs; if (!opj_codec_set_threads (dec, max_threads)) GST_WARNING_OBJECT (self, "Failed to set %d number of threads", max_threads); - if (!gst_buffer_map (frame->input_buffer, &map, GST_MAP_READ)) - goto map_read_error; + if (!gst_buffer_map (message->input_buffer, &map, GST_MAP_READ)) + DECODE_ERROR (self, message, OPENJPEG_ERROR_MAP_READ, FALSE); + if (self->is_jp2c && map.size < 8) - goto open_error; + DECODE_ERROR (self, message, OPENJPEG_ERROR_MAP_READ, FALSE); stream = opj_stream_create (4096, OPJ_TRUE); if (!stream) - goto open_error; + DECODE_ERROR (self, message, OPENJPEG_ERROR_OPEN, FALSE); mstream.data = map.data + (self->is_jp2c ? 8 : 0); mstream.offset = 0; @@ -1252,120 +1545,286 @@ gst_openjpeg_dec_handle_frame (GstVideoDecoder * decoder, image = NULL; if (!opj_read_header (stream, dec, &image)) - goto decode_error; + DECODE_ERROR (self, message, OPENJPEG_ERROR_DECODE, FALSE); if (!opj_decode (dec, stream, image)) - goto decode_error; - - { - gint i; + DECODE_ERROR (self, message, OPENJPEG_ERROR_DECODE, FALSE); - for (i = 0; i < image->numcomps; i++) { - if (image->comps[i].data == NULL) - goto decode_error; - } + for (i = 0; i < image->numcomps; i++) { + if (image->comps[i].data == NULL) + DECODE_ERROR (self, message, OPENJPEG_ERROR_DECODE, FALSE); } - gst_buffer_unmap (frame->input_buffer, &map); + gst_buffer_unmap (message->input_buffer, &map); + + g_mutex_lock (&self->decoding_lock); ret = gst_openjpeg_dec_negotiate (self, image); if (ret != GST_FLOW_OK) - goto negotiate_error; - if (!gst_video_decoder_get_subframe_mode (decoder) - || gst_video_decoder_get_current_subframe_index (decoder) == 1) { - ret = gst_video_decoder_allocate_output_frame (decoder, frame); + DECODE_ERROR (self, message, OPENJPEG_ERROR_NEGOCIATE, TRUE); + + if (message->frame->output_buffer == NULL) { + ret = gst_video_decoder_allocate_output_frame (decoder, message->frame); if (ret != GST_FLOW_OK) - goto allocate_error; - self->current_frame = gst_video_codec_frame_ref (frame); + DECODE_ERROR (self, message, OPENJPEG_ERROR_ALLOCATE, TRUE); } if (!gst_video_frame_map (&vframe, &self->output_state->info, - self->current_frame->output_buffer, GST_MAP_WRITE)) - goto map_write_error; + message->frame->output_buffer, GST_MAP_WRITE)) + DECODE_ERROR (self, message, OPENJPEG_ERROR_MAP_WRITE, TRUE); - if (current_stripe) + if (message->stripe) self->fill_frame (self, &vframe, image); else { GST_ERROR_OBJECT (decoder, " current_stripe should be greater than 0"); - goto map_write_error; + DECODE_ERROR (self, message, OPENJPEG_ERROR_MAP_WRITE, TRUE); } gst_video_frame_unmap (&vframe); + g_mutex_unlock (&self->decoding_lock); + message->last_error = OPENJPEG_ERROR_NONE; + GST_DEBUG_OBJECT (self, "Finished to decode stripe message=%p stripe=%d", + message->frame, message->stripe); +done: + if (!message->direct) { + g_mutex_lock (&self->messages_lock); + self->available_threads++; + g_queue_push_tail (&self->messages, message); + g_mutex_unlock (&self->messages_lock); + g_cond_broadcast (&self->messages_cond); + } - opj_end_decompress (dec, stream); - opj_stream_destroy (stream); - opj_image_destroy (image); - opj_destroy_codec (dec); + if (stream) { + opj_end_decompress (dec, stream); + opj_stream_destroy (stream); + } + if (image) + opj_image_destroy (image); + if (dec) + opj_destroy_codec (dec); +} - if (current_stripe == self->num_stripes) { - ret = gst_video_decoder_finish_frame (decoder, self->current_frame); - gst_video_codec_frame_unref (frame); - self->current_frame = NULL; - } else if (gst_video_decoder_get_current_subframe_index (decoder) > 0) - gst_video_codec_frame_unref (frame); +static GstFlowReturn +gst_openjpeg_dec_decode_frame_multiple (GstVideoDecoder * decoder, + GstVideoCodecFrame * frame) +{ + GstOpenJPEGDec *self = GST_OPENJPEG_DEC (decoder); + GstOpenJPEGCodecMessage *message = NULL; + guint current_stripe = + gst_video_decoder_get_input_subframe_index (decoder, frame); + + if (!self->started) { + GST_DEBUG_OBJECT (self, "Starting task"); + gst_pad_start_task (GST_VIDEO_DECODER_SRC_PAD (self), + (GstTaskFunction) gst_openjpeg_dec_loop, decoder, NULL); + self->started = TRUE; + } + /* Make sure to release the base class stream lock, otherwise + * _loop() can't call _finish_frame() and we might block forever + * because no input buffers are released */ + GST_VIDEO_DECODER_STREAM_UNLOCK (self); + + while (!self->available_threads) + gst_openjpeg_dec_wait_for_new_message (self, TRUE); + + GST_VIDEO_DECODER_STREAM_LOCK (self); + + if (self->downstream_flow_ret != GST_FLOW_OK) + return self->downstream_flow_ret; + + g_mutex_lock (&self->messages_lock); + message = gst_openjpeg_decode_message_new (self, frame, current_stripe); + GST_LOG_OBJECT (self, + "About to enqueue a decoding message from frame %p stripe %d", frame, + message->stripe); + + if (self->available_threads) + self->available_threads--; + g_mutex_unlock (&self->messages_lock); + + gst_element_call_async (GST_ELEMENT (self), + (GstElementCallAsyncFunc) gst_openjpeg_dec_decode_stripe, message, NULL); + if (gst_video_decoder_get_subframe_mode (decoder) + && gst_openjpeg_dec_is_last_input_subframe (decoder, message)) + gst_video_decoder_have_last_subframe (decoder, frame); + return GST_FLOW_OK; +} + +static GstFlowReturn +gst_openjpeg_dec_decode_frame_single (GstVideoDecoder * decoder, + GstVideoCodecFrame * frame) +{ + GstOpenJPEGDec *self = GST_OPENJPEG_DEC (decoder); + GstOpenJPEGCodecMessage *message = NULL; + guint current_stripe = + gst_video_decoder_get_input_subframe_index (decoder, frame); + GstFlowReturn ret = GST_FLOW_OK; + message = gst_openjpeg_decode_message_new (self, frame, current_stripe); + message->direct = TRUE; + gst_openjpeg_dec_decode_stripe (GST_ELEMENT (decoder), message); + if (message->last_error != OPENJPEG_ERROR_NONE) { + GST_WARNING_OBJECT + (self, "An error occured %d during the JPEG decoding", + message->last_error); + self->last_error = message->last_error; + ret = GST_FLOW_ERROR; + goto done; + } + if (gst_openjpeg_dec_is_last_output_subframe (decoder, message)) + ret = gst_video_decoder_finish_frame (decoder, message->frame); + else + gst_video_decoder_finish_subframe (decoder, message->frame); + +done: + gst_openjpeg_decode_message_free (self, message); return ret; +} -initialization_error: - { - gst_video_codec_frame_unref (frame); - GST_ELEMENT_ERROR (self, LIBRARY, INIT, - ("Failed to initialize OpenJPEG decoder"), (NULL)); +static gboolean +gst_openjpeg_dec_flush (GstVideoDecoder * decoder) +{ + GstOpenJPEGDec *self = GST_OPENJPEG_DEC (decoder); - return GST_FLOW_ERROR; - } -map_read_error: - { - gst_openjpeg_dec_handle_frame_cleanup (self, frame, NULL, dec, stream, - image); + GST_DEBUG_OBJECT (self, "Flushing decoder"); + + /* 2) Wait until the srcpad loop is stopped, + * unlock GST_VIDEO_DECODER_STREAM_LOCK to prevent deadlocks + * caused by using this lock from inside the loop function */ + GST_VIDEO_DECODER_STREAM_UNLOCK (self); + gst_pad_stop_task (GST_VIDEO_DECODER_SRC_PAD (decoder)); + GST_DEBUG_OBJECT (self, "Flushing -- task stopped"); + GST_VIDEO_DECODER_STREAM_LOCK (self); + + /* Reset our state */ + self->started = FALSE; + GST_DEBUG_OBJECT (self, "Flush finished"); + + return TRUE; +} - GST_ELEMENT_ERROR (self, CORE, FAILED, - ("Failed to map input buffer"), (NULL)); - return GST_FLOW_ERROR; +static GstFlowReturn +gst_openjpeg_dec_handle_frame (GstVideoDecoder * decoder, + GstVideoCodecFrame * frame) +{ + GstOpenJPEGDec *self = GST_OPENJPEG_DEC (decoder); + GstFlowReturn ret = GST_FLOW_OK; + gint64 deadline; + guint current_stripe = + gst_video_decoder_get_input_subframe_index (decoder, frame); + + if (self->downstream_flow_ret != GST_FLOW_OK) { + gst_video_codec_frame_unref (frame); + return self->downstream_flow_ret; } -open_error: - { - gst_openjpeg_dec_handle_frame_cleanup (self, frame, &map, dec, stream, - image); - GST_ELEMENT_ERROR (self, LIBRARY, INIT, - ("Failed to open OpenJPEG stream"), (NULL)); - return GST_FLOW_ERROR; + GST_DEBUG_OBJECT (self, "Handling frame with current stripe %d", + current_stripe); + + deadline = gst_video_decoder_get_max_decode_time (decoder, frame); + if (self->drop_subframes || deadline < 0) { + GST_INFO_OBJECT (self, + "Dropping too late frame: deadline %" G_GINT64_FORMAT, deadline); + self->drop_subframes = TRUE; + if (current_stripe == self->num_stripes || + GST_BUFFER_FLAG_IS_SET (frame->input_buffer, GST_BUFFER_FLAG_MARKER)) { + ret = gst_video_decoder_drop_frame (decoder, frame); + self->drop_subframes = FALSE; + } else { + gst_video_decoder_drop_subframe (decoder, frame); + } + + goto done; } -decode_error: - { - gst_openjpeg_dec_handle_frame_cleanup (self, frame, &map, dec, stream, - image); - GST_VIDEO_DECODER_ERROR (self, 1, STREAM, DECODE, - ("Failed to decode OpenJPEG stream"), (NULL), ret); - return ret; + ret = self->decode_frame (decoder, frame); + if (ret != GST_FLOW_OK) { + GST_WARNING_OBJECT (self, "Unable to decode the frame with flow error: %s", + gst_flow_get_name (ret)); + goto error; } -negotiate_error: - { - gst_openjpeg_dec_handle_frame_cleanup (self, frame, NULL, dec, stream, - image); - GST_ELEMENT_ERROR (self, CORE, NEGOTIATION, - ("Failed to negotiate"), (NULL)); - return ret; +done: + return ret; + +error: + switch (self->last_error) { + case OPENJPEG_ERROR_INIT: + GST_ELEMENT_ERROR (self, LIBRARY, INIT, + ("Failed to initialize OpenJPEG decoder"), (NULL)); + break; + case OPENJPEG_ERROR_MAP_READ: + GST_ELEMENT_ERROR (self, CORE, FAILED, + ("Failed to map input buffer"), (NULL)); + break; + case OPENJPEG_ERROR_MAP_WRITE: + GST_ELEMENT_ERROR (self, CORE, FAILED, + ("Failed to map input buffer"), (NULL)); + break; + case OPENJPEG_ERROR_FILL_IMAGE: + GST_ELEMENT_ERROR (self, LIBRARY, INIT, + ("Failed to fill OpenJPEG image"), (NULL)); + break; + case OPENJPEG_ERROR_OPEN: + GST_ELEMENT_ERROR (self, LIBRARY, INIT, + ("Failed to open OpenJPEG data"), (NULL)); + break; + case OPENJPEG_ERROR_DECODE: + GST_ELEMENT_ERROR (self, LIBRARY, INIT, + ("Failed to decode OpenJPEG data"), (NULL)); + break; + case OPENJPEG_ERROR_NEGOCIATE: + GST_ELEMENT_ERROR (self, LIBRARY, INIT, + ("Failed to negociate OpenJPEG data"), (NULL)); + break; + case OPENJPEG_ERROR_ALLOCATE: + GST_ELEMENT_ERROR (self, LIBRARY, INIT, + ("Failed to allocate OpenJPEG data"), (NULL)); + break; + default: + GST_ELEMENT_ERROR (self, LIBRARY, INIT, + ("Failed to encode OpenJPEG data"), (NULL)); + break; } -allocate_error: - { - gst_openjpeg_dec_handle_frame_cleanup (self, frame, NULL, dec, stream, - image); - GST_ELEMENT_ERROR (self, CORE, FAILED, - ("Failed to allocate output buffer"), (NULL)); - return ret; + return GST_FLOW_ERROR; +} + +static GstFlowReturn +gst_openjpeg_dec_finish (GstVideoDecoder * decoder) +{ + GstOpenJPEGDec *self = GST_OPENJPEG_DEC (decoder); + + GST_DEBUG_OBJECT (self, "Draining component"); + + if (!self->started) { + GST_DEBUG_OBJECT (self, "Component not started yet"); + return GST_FLOW_OK; } -map_write_error: - { - gst_openjpeg_dec_handle_frame_cleanup (self, frame, NULL, dec, stream, - image); - GST_ELEMENT_ERROR (self, CORE, FAILED, ("Failed to map output buffer"), - (NULL)); - return GST_FLOW_ERROR; + + self->draining = TRUE; + if (!gst_openjpeg_dec_has_pending_job_to_finish (self)) { + GST_DEBUG_OBJECT (self, "Component ready"); + g_cond_broadcast (&self->messages_cond); + return GST_FLOW_OK; } + + /* Make sure to release the base class stream lock, otherwise + * _loop() can't call _finish_frame() and we might block forever + * because no input buffers are released */ + GST_VIDEO_DECODER_STREAM_UNLOCK (self); + + g_mutex_lock (&self->drain_lock); + GST_DEBUG_OBJECT (self, "Waiting until component is drained"); + + while (self->draining) + g_cond_wait (&self->drain_cond, &self->drain_lock); + + GST_DEBUG_OBJECT (self, "Drained component"); + + g_mutex_unlock (&self->drain_lock); + GST_VIDEO_DECODER_STREAM_LOCK (self); + self->started = FALSE; + return GST_FLOW_OK; } static gboolean diff --git a/ext/openjpeg/gstopenjpegdec.h b/ext/openjpeg/gstopenjpegdec.h index 427b85902..413062e3f 100644 --- a/ext/openjpeg/gstopenjpegdec.h +++ b/ext/openjpeg/gstopenjpegdec.h @@ -58,15 +58,36 @@ struct _GstOpenJPEGDec GstJPEG2000Sampling sampling; gint ncomps; gint max_threads; /* atomic */ + gint max_slice_threads; /* internal openjpeg threading system */ gint num_procs; gint num_stripes; - GstVideoCodecFrame *current_frame; gboolean drop_subframes; void (*fill_frame) (GstOpenJPEGDec *self, GstVideoFrame *frame, opj_image_t * image); + gboolean (*decode_frame) (GstVideoDecoder * decoder, GstVideoCodecFrame *frame); + opj_dparameters_t params; + + guint available_threads; + GQueue messages; + + GCond messages_cond; + GMutex messages_lock; + GMutex decoding_lock; + GstFlowReturn downstream_flow_ret; + gboolean flushing; + + /* Draining state */ + GMutex drain_lock; + GCond drain_cond; + /* TRUE if EOS buffers shouldn't be forwarded */ + gboolean draining; /* protected by drain_lock */ + + int last_error; + + gboolean started; }; struct _GstOpenJPEGDecClass -- cgit v1.2.1