From 116a10d531d4d9bd523b943a427ebec21ab1784c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Cerveau?= Date: Tue, 21 Apr 2020 20:56:03 +0200 Subject: openjpegenc: support for a multithreaded encoding. This commit introduces a multithreaded encoder allowing to encode mulitple stripes or subframes in separated threads. This feature aims to enhance the overall latency of a codec pipeline. Part-of: --- ext/openjpeg/gstopenjpeg.h | 23 ++ ext/openjpeg/gstopenjpegenc.c | 579 +++++++++++++++++++++++++++++++----------- ext/openjpeg/gstopenjpegenc.h | 8 + 3 files changed, 463 insertions(+), 147 deletions(-) (limited to 'ext') diff --git a/ext/openjpeg/gstopenjpeg.h b/ext/openjpeg/gstopenjpeg.h index 4f0754289..275b8dcf6 100644 --- a/ext/openjpeg/gstopenjpeg.h +++ b/ext/openjpeg/gstopenjpeg.h @@ -23,4 +23,27 @@ #include +typedef enum +{ + OPENJPEG_ERROR_NONE = 0, + OPENJPEG_ERROR_INIT, + OPENJPEG_ERROR_ENCODE, + OPENJPEG_ERROR_DECODE, + OPENJPEG_ERROR_OPEN, + OPENJPEG_ERROR_MAP_READ, + OPENJPEG_ERROR_MAP_WRITE, + OPENJPEG_ERROR_FILL_IMAGE, + OPENJPEG_ERROR_NEGOCIATE, + OPENJPEG_ERROR_ALLOCATE, +} OpenJPEGErrorCode; + +typedef struct +{ + GstVideoCodecFrame *frame; + GstBuffer *output_buffer; + gint stripe; + OpenJPEGErrorCode last_error; + gboolean direct; +} GstOpenJPEGCodecMessage; + #endif /* __GST_OPENJPEG_H__ */ diff --git a/ext/openjpeg/gstopenjpegenc.c b/ext/openjpeg/gstopenjpegenc.c index f129e56ef..af5814bb4 100644 --- a/ext/openjpeg/gstopenjpegenc.c +++ b/ext/openjpeg/gstopenjpegenc.c @@ -20,6 +20,23 @@ * */ +/** + * SECTION:element-openjpegenc + * @title: openjpegenc + * @see_also: openjpegdec + * + * openjpegenc encodes raw video stream. + * + * ## Example launch lines + * |[ + * gst-launch-1.0 -v videotestsrc num-buffers=10 ! openjpegenc ! jpeg2000parse ! openjpegdec ! videoconvert ! autovideosink sync=false + * ]| Encode and decode whole frames. + * |[ + * gst-launch-1.0 -v videotestsrc num-buffers=10 ! openjpegenc num-threads=8 num-stripes=8 ! jpeg2000parse ! openjpegdec max-threads=8 ! videoconvert ! autovideosink sync=fals + * ]| Encode and decode frame split with stripes. + * + */ + #ifdef HAVE_CONFIG_H #include "config.h" #endif @@ -69,9 +86,11 @@ enum PROP_TILE_WIDTH, PROP_TILE_HEIGHT, PROP_NUM_STRIPES, + PROP_NUM_THREADS, PROP_LAST }; + #define DEFAULT_NUM_LAYERS 1 #define DEFAULT_NUM_RESOLUTIONS 6 #define DEFAULT_PROGRESSION_ORDER OPJ_LRCP @@ -80,6 +99,13 @@ enum #define DEFAULT_TILE_WIDTH 0 #define DEFAULT_TILE_HEIGHT 0 #define GST_OPENJPEG_ENC_DEFAULT_NUM_STRIPES 1 +#define GST_OPENJPEG_ENC_DEFAULT_NUM_THREADS 0 + +/* prototypes */ +static void gst_openjpeg_enc_finalize (GObject * object); + +static GstStateChangeReturn +gst_openjpeg_enc_change_state (GstElement * element, GstStateChange transition); static void gst_openjpeg_enc_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec); @@ -94,6 +120,12 @@ static GstFlowReturn gst_openjpeg_enc_handle_frame (GstVideoEncoder * encoder, GstVideoCodecFrame * frame); static gboolean gst_openjpeg_enc_propose_allocation (GstVideoEncoder * encoder, GstQuery * query); +static GstFlowReturn gst_openjpeg_enc_encode_frame_multiple (GstVideoEncoder * + encoder, GstVideoCodecFrame * frame); +static GstFlowReturn gst_openjpeg_enc_encode_frame_single (GstVideoEncoder * + encoder, GstVideoCodecFrame * frame); +static GstOpenJPEGCodecMessage + * gst_openjpeg_encode_message_free (GstOpenJPEGCodecMessage * message); #if G_BYTE_ORDER == G_LITTLE_ENDIAN #define GRAY16 "GRAY16_LE" @@ -150,6 +182,10 @@ gst_openjpeg_enc_class_init (GstOpenJPEGEncClass * klass) gobject_class->set_property = gst_openjpeg_enc_set_property; gobject_class->get_property = gst_openjpeg_enc_get_property; + gobject_class->finalize = gst_openjpeg_enc_finalize; + + element_class->change_state = + GST_DEBUG_FUNCPTR (gst_openjpeg_enc_change_state); g_object_class_install_property (gobject_class, PROP_NUM_LAYERS, g_param_spec_int ("num-layers", "Number of layers", @@ -199,6 +235,18 @@ gst_openjpeg_enc_class_init (GstOpenJPEGEncClass * klass) "Number of stripes for low latency encoding. (1 = low latency disabled)", 1, G_MAXINT, GST_OPENJPEG_ENC_DEFAULT_NUM_STRIPES, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); + /** + * GstOpenJPEGEnc:num-threads: + * + * Max number of simultaneous threads to encode stripes, default: encode with streaming thread + * + * Since: 1.20 + */ + g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_NUM_THREADS, + g_param_spec_uint ("num-threads", "Number of threads", + "Max number of simultaneous threads to encode stripe or frame, default: encode with streaming thread.", + 0, G_MAXINT, GST_OPENJPEG_ENC_DEFAULT_NUM_THREADS, + G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); gst_element_class_add_static_pad_template (element_class, &gst_openjpeg_enc_src_template); @@ -259,6 +307,10 @@ gst_openjpeg_enc_init (GstOpenJPEGEnc * self) && self->params.cp_tdy != 0); self->num_stripes = GST_OPENJPEG_ENC_DEFAULT_NUM_STRIPES; + g_cond_init (&self->messages_cond); + g_queue_init (&self->messages); + + self->available_threads = GST_OPENJPEG_ENC_DEFAULT_NUM_THREADS; } static void @@ -296,6 +348,9 @@ gst_openjpeg_enc_set_property (GObject * object, guint prop_id, case PROP_NUM_STRIPES: self->num_stripes = g_value_get_int (value); break; + case PROP_NUM_THREADS: + self->available_threads = g_value_get_uint (value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -333,6 +388,9 @@ gst_openjpeg_enc_get_property (GObject * object, guint prop_id, GValue * value, case PROP_NUM_STRIPES: g_value_set_int (value, self->num_stripes); break; + case PROP_NUM_THREADS: + g_value_set_uint (value, self->available_threads); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; @@ -345,6 +403,10 @@ gst_openjpeg_enc_start (GstVideoEncoder * encoder) GstOpenJPEGEnc *self = GST_OPENJPEG_ENC (encoder); GST_DEBUG_OBJECT (self, "Starting"); + if (self->available_threads) + self->encode_frame = gst_openjpeg_enc_encode_frame_multiple; + else + self->encode_frame = gst_openjpeg_enc_encode_frame_single; return TRUE; } @@ -371,6 +433,56 @@ gst_openjpeg_enc_stop (GstVideoEncoder * video_encoder) return TRUE; } +static void +gst_openjpeg_enc_flush_messages (GstOpenJPEGEnc * self) +{ + GstOpenJPEGCodecMessage *enc_params; + + GST_OBJECT_LOCK (self); + while ((enc_params = g_queue_pop_head (&self->messages))) { + gst_openjpeg_encode_message_free (enc_params); + } + g_cond_broadcast (&self->messages_cond); + GST_OBJECT_UNLOCK (self); +} + +static void +gst_openjpeg_enc_finalize (GObject * object) +{ + GstOpenJPEGEnc *self = GST_OPENJPEG_ENC (object); + + gst_openjpeg_enc_flush_messages (self); + g_cond_clear (&self->messages_cond); + + G_OBJECT_CLASS (parent_class)->finalize (object); +} + +static GstStateChangeReturn +gst_openjpeg_enc_change_state (GstElement * element, GstStateChange transition) +{ + GstOpenJPEGEnc *self; + + g_return_val_if_fail (GST_IS_OPENJPEG_ENC (element), + GST_STATE_CHANGE_FAILURE); + self = GST_OPENJPEG_ENC (element); + + switch (transition) { + case GST_STATE_CHANGE_NULL_TO_READY: + break; + case GST_STATE_CHANGE_READY_TO_PAUSED: + break; + case GST_STATE_CHANGE_PAUSED_TO_PLAYING: + break; + case GST_STATE_CHANGE_PAUSED_TO_READY: + gst_openjpeg_enc_flush_messages (self); + break; + default: + break; + } + + return GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); +} + static guint get_stripe_height (GstOpenJPEGEnc * self, guint slice_num, guint frame_height) { @@ -809,7 +921,7 @@ gst_openjpeg_enc_fill_image (GstOpenJPEGEnc * self, GstVideoFrame * frame, for (i = 0; i < ncomps; i++) { gint nominal_height = min_height / comps[i].dy; - comps[i].h = (slice_num < self->num_stripes - 1) ? + comps[i].h = (slice_num < self->num_stripes) ? nominal_height : GST_VIDEO_FRAME_COMP_HEIGHT (frame, i) - (self->num_stripes - 1) * nominal_height; @@ -837,10 +949,10 @@ gst_openjpeg_enc_fill_image (GstOpenJPEGEnc * self, GstVideoFrame * frame, image->x0 = 0; image->x1 = GST_VIDEO_FRAME_WIDTH (frame); - image->y0 = slice_num * min_height; + image->y0 = (slice_num - 1) * min_height; image->y1 = (slice_num < - self->num_stripes - 1) ? image->y0 + + self->num_stripes) ? image->y0 + min_height : GST_VIDEO_FRAME_HEIGHT (frame); self->fill_image (image, frame); @@ -940,21 +1052,299 @@ seek_fn (OPJ_OFF_T p_nb_bytes, void *p_user_data) return OPJ_TRUE; } +static gboolean +gst_openjpeg_encode_is_last_subframe (GstVideoEncoder * enc, int stripe) +{ + GstOpenJPEGEnc *self = GST_OPENJPEG_ENC (enc); + + return (stripe == self->num_stripes); +} + +static GstOpenJPEGCodecMessage * +gst_openjpeg_encode_message_new (GstOpenJPEGEnc * self, + GstVideoCodecFrame * frame, int num_stripe) +{ + GstOpenJPEGCodecMessage *message = g_slice_new0 (GstOpenJPEGCodecMessage); + + message->frame = gst_video_codec_frame_ref (frame); + message->stripe = num_stripe; + message->last_error = OPENJPEG_ERROR_NONE; + + return message; +} + +static GstOpenJPEGCodecMessage * +gst_openjpeg_encode_message_free (GstOpenJPEGCodecMessage * message) +{ + if (message) { + gst_video_codec_frame_unref (message->frame); + if (message->output_buffer) + gst_buffer_unref (message->output_buffer); + g_slice_free (GstOpenJPEGCodecMessage, message); + } + return NULL; +} + +#define ENCODE_ERROR(encode_params, err_code) { \ + encode_params->last_error = err_code; \ + goto done; \ +} + +/* callback method to be called asynchronously or not*/ +static void +gst_openjpeg_enc_encode_stripe (GstElement * element, gpointer user_data) +{ + GstOpenJPEGCodecMessage *message = (GstOpenJPEGCodecMessage *) user_data; + GstOpenJPEGEnc *self = GST_OPENJPEG_ENC (element); + opj_codec_t *enc = NULL; + opj_stream_t *stream = NULL; + MemStream mstream; + opj_image_t *image = NULL; + GstVideoFrame vframe; + + GST_INFO_OBJECT (self, "Encode stripe %d/%d", message->stripe, + self->num_stripes); + + mstream.data = NULL; + enc = opj_create_compress (self->codec_format); + if (!enc) + ENCODE_ERROR (message, OPENJPEG_ERROR_INIT); + + if (G_UNLIKELY (gst_debug_category_get_threshold (GST_CAT_DEFAULT) >= + GST_LEVEL_TRACE)) { + opj_set_info_handler (enc, gst_openjpeg_enc_opj_info, self); + opj_set_warning_handler (enc, gst_openjpeg_enc_opj_warning, self); + opj_set_error_handler (enc, gst_openjpeg_enc_opj_error, self); + } else { + opj_set_info_handler (enc, NULL, NULL); + opj_set_warning_handler (enc, NULL, NULL); + opj_set_error_handler (enc, NULL, NULL); + } + if (!gst_video_frame_map (&vframe, &self->input_state->info, + message->frame->input_buffer, GST_MAP_READ)) + ENCODE_ERROR (message, OPENJPEG_ERROR_MAP_READ); + image = gst_openjpeg_enc_fill_image (self, &vframe, message->stripe); + gst_video_frame_unmap (&vframe); + if (!image) + ENCODE_ERROR (message, OPENJPEG_ERROR_FILL_IMAGE); + + if (vframe.info.finfo->flags & GST_VIDEO_FORMAT_FLAG_RGB) { + self->params.tcp_mct = 1; + } + opj_setup_encoder (enc, &self->params, image); + stream = opj_stream_create (4096, OPJ_FALSE); + if (!stream) + ENCODE_ERROR (message, OPENJPEG_ERROR_OPEN); + + mstream.allocsize = 4096; + mstream.data = g_malloc (mstream.allocsize); + mstream.offset = 0; + mstream.size = 0; + + opj_stream_set_read_function (stream, read_fn); + opj_stream_set_write_function (stream, write_fn); + opj_stream_set_skip_function (stream, skip_fn); + opj_stream_set_seek_function (stream, seek_fn); + opj_stream_set_user_data (stream, &mstream, NULL); + opj_stream_set_user_data_length (stream, mstream.size); + + if (!opj_start_compress (enc, image, stream)) + ENCODE_ERROR (message, OPENJPEG_ERROR_ENCODE); + + if (!opj_encode (enc, stream)) + ENCODE_ERROR (message, OPENJPEG_ERROR_ENCODE); + + if (!opj_end_compress (enc, stream)) + ENCODE_ERROR (message, OPENJPEG_ERROR_ENCODE); + + opj_image_destroy (image); + image = NULL; + opj_stream_destroy (stream); + stream = NULL; + opj_destroy_codec (enc); + enc = NULL; + + message->output_buffer = gst_buffer_new (); + + if (self->is_jp2c) { + GstMapInfo map; + GstMemory *mem; + + mem = gst_allocator_alloc (NULL, 8, NULL); + gst_memory_map (mem, &map, GST_MAP_WRITE); + GST_WRITE_UINT32_BE (map.data, mstream.size + 8); + GST_WRITE_UINT32_BE (map.data + 4, GST_MAKE_FOURCC ('j', 'p', '2', 'c')); + gst_memory_unmap (mem, &map); + gst_buffer_append_memory (message->output_buffer, mem); + } + + gst_buffer_append_memory (message->output_buffer, + gst_memory_new_wrapped (0, mstream.data, mstream.allocsize, 0, + mstream.size, mstream.data, (GDestroyNotify) g_free)); + message->last_error = OPENJPEG_ERROR_NONE; + + GST_INFO_OBJECT (self, + "Stripe %d encoded successfully, pass it to the streaming thread", + message->stripe); + +done: + if (message->last_error != OPENJPEG_ERROR_NONE) { + if (mstream.data) + g_free (mstream.data); + if (enc) + opj_destroy_codec (enc); + if (image) + opj_image_destroy (image); + if (stream) + opj_stream_destroy (stream); + } + if (!message->direct) { + GST_OBJECT_LOCK (self); + g_queue_push_tail (&self->messages, message); + g_cond_signal (&self->messages_cond); + GST_OBJECT_UNLOCK (self); + } + +} + +static GstOpenJPEGCodecMessage * +gst_openjpeg_enc_wait_for_new_message (GstOpenJPEGEnc * self) +{ + GstOpenJPEGCodecMessage *message = NULL; + + GST_OBJECT_LOCK (self); + while (g_queue_is_empty (&self->messages)) + g_cond_wait (&self->messages_cond, GST_OBJECT_GET_LOCK (self)); + message = g_queue_pop_head (&self->messages); + GST_OBJECT_UNLOCK (self); + + return message; +} + +static GstFlowReturn +gst_openjpeg_enc_encode_frame_multiple (GstVideoEncoder * encoder, + GstVideoCodecFrame * frame) +{ + GstOpenJPEGEnc *self = GST_OPENJPEG_ENC (encoder); + GstFlowReturn ret = GST_FLOW_OK; + guint i; + guint encoded_stripes = 0; + guint enqueued_stripes = 0; + GstOpenJPEGCodecMessage *message = NULL; + + /* The method receives a frame and split it into n stripes and + * and create a thread per stripe to encode it. + * As the number of stripes can be greater than the + * available threads to encode, there is two loop, one to + * count the enqueues stripes and one to count the encoded + * stripes. + */ + while (encoded_stripes < self->num_stripes) { + for (i = 1; + i <= self->available_threads + && enqueued_stripes < (self->num_stripes - encoded_stripes); i++) { + message = + gst_openjpeg_encode_message_new (self, frame, i + encoded_stripes); + GST_LOG_OBJECT (self, + "About to enqueue an encoding message from frame %p stripe %d", frame, + message->stripe); + gst_element_call_async (GST_ELEMENT (self), + (GstElementCallAsyncFunc) gst_openjpeg_enc_encode_stripe, message, + NULL); + enqueued_stripes++; + } + while (enqueued_stripes > 0) { + message = gst_openjpeg_enc_wait_for_new_message (self); + if (!message) + continue; + enqueued_stripes--; + if (message->last_error == OPENJPEG_ERROR_NONE) { + GST_LOG_OBJECT (self, + "About to push frame %p stripe %d", frame, message->stripe); + frame->output_buffer = gst_buffer_ref (message->output_buffer); + if (gst_openjpeg_encode_is_last_subframe (encoder, encoded_stripes + 1)) { + GST_VIDEO_CODEC_FRAME_SET_SYNC_POINT (frame); + ret = gst_video_encoder_finish_frame (encoder, frame); + } else + ret = gst_video_encoder_finish_subframe (encoder, frame); + if (ret != GST_FLOW_OK) { + GST_WARNING_OBJECT + (self, "An error occurred pushing the frame %s", + gst_flow_get_name (ret)); + goto done; + } + encoded_stripes++; + message = gst_openjpeg_encode_message_free (message); + } else { + GST_WARNING_OBJECT + (self, "An error occurred %d during the JPEG encoding", + message->last_error); + gst_video_codec_frame_unref (frame); + self->last_error = message->last_error; + ret = GST_FLOW_ERROR; + goto done; + } + } + } + +done: + gst_openjpeg_encode_message_free (message); + return ret; +} + +static GstFlowReturn +gst_openjpeg_enc_encode_frame_single (GstVideoEncoder * encoder, + GstVideoCodecFrame * frame) +{ + GstOpenJPEGEnc *self = GST_OPENJPEG_ENC (encoder); + GstFlowReturn ret = GST_FLOW_OK; + guint i; + GstOpenJPEGCodecMessage *message = NULL; + + for (i = 1; i <= self->num_stripes; ++i) { + message = gst_openjpeg_encode_message_new (self, frame, i); + message->direct = TRUE; + gst_openjpeg_enc_encode_stripe (GST_ELEMENT (self), message); + if (message->last_error != OPENJPEG_ERROR_NONE) { + GST_WARNING_OBJECT + (self, "An error occured %d during the JPEG encoding", + message->last_error); + gst_video_codec_frame_unref (frame); + self->last_error = message->last_error; + ret = GST_FLOW_ERROR; + goto done; + } + frame->output_buffer = gst_buffer_ref (message->output_buffer); + if (gst_openjpeg_encode_is_last_subframe (encoder, message->stripe)) { + GST_VIDEO_CODEC_FRAME_SET_SYNC_POINT (frame); + ret = gst_video_encoder_finish_frame (encoder, frame); + } else + ret = gst_video_encoder_finish_subframe (encoder, frame); + if (ret != GST_FLOW_OK) { + GST_WARNING_OBJECT + (self, "An error occurred pushing the frame %s", + gst_flow_get_name (ret)); + goto done; + } + message = gst_openjpeg_encode_message_free (message); + } + +done: + gst_openjpeg_encode_message_free (message); + return ret; +} + static GstFlowReturn gst_openjpeg_enc_handle_frame (GstVideoEncoder * encoder, GstVideoCodecFrame * frame) { GstOpenJPEGEnc *self = GST_OPENJPEG_ENC (encoder); GstFlowReturn ret = GST_FLOW_OK; - opj_codec_t *enc; - opj_stream_t *stream; - MemStream mstream; - opj_image_t *image; GstVideoFrame vframe; - guint i; + GstCaps *current_caps; GstStructure *s; - gboolean stripe_mode = + gboolean subframe_mode = self->num_stripes != GST_OPENJPEG_ENC_DEFAULT_NUM_STRIPES; GST_DEBUG_OBJECT (self, "Handling frame"); @@ -962,7 +1352,7 @@ gst_openjpeg_enc_handle_frame (GstVideoEncoder * encoder, current_caps = gst_pad_get_current_caps (GST_VIDEO_ENCODER_SRC_PAD (encoder)); s = gst_caps_get_structure (current_caps, 0); - if (stripe_mode) { + if (subframe_mode) { const gchar *str = gst_structure_get_string (s, "alignment"); gint min_res; @@ -980,9 +1370,8 @@ gst_openjpeg_enc_handle_frame (GstVideoEncoder * encoder, if (!gst_video_frame_map (&vframe, &self->input_state->info, frame->input_buffer, GST_MAP_READ)) { gst_video_codec_frame_unref (frame); - GST_ELEMENT_ERROR (self, CORE, FAILED, - ("Failed to map input buffer"), (NULL)); - return GST_FLOW_ERROR; + self->last_error = OPENJPEG_ERROR_MAP_READ; + goto error; } /* find stripe with least height */ min_res = @@ -995,146 +1384,42 @@ gst_openjpeg_enc_handle_frame (GstVideoEncoder * encoder, self->params.numresolution = MIN (min_res + 1, self->params.numresolution); gst_video_frame_unmap (&vframe); } - - - for (i = 0; i < self->num_stripes; ++i) { - enc = opj_create_compress (self->codec_format); - if (!enc) - goto initialization_error; - - if (G_UNLIKELY (gst_debug_category_get_threshold (GST_CAT_DEFAULT) >= - GST_LEVEL_TRACE)) { - opj_set_info_handler (enc, gst_openjpeg_enc_opj_info, self); - opj_set_warning_handler (enc, gst_openjpeg_enc_opj_warning, self); - opj_set_error_handler (enc, gst_openjpeg_enc_opj_error, self); - } else { - opj_set_info_handler (enc, NULL, NULL); - opj_set_warning_handler (enc, NULL, NULL); - opj_set_error_handler (enc, NULL, NULL); - } - - if (!gst_video_frame_map (&vframe, &self->input_state->info, - frame->input_buffer, GST_MAP_READ)) - goto map_read_error; - - image = gst_openjpeg_enc_fill_image (self, &vframe, i); - if (!image) - goto fill_image_error; - gst_video_frame_unmap (&vframe); - - if (vframe.info.finfo->flags & GST_VIDEO_FORMAT_FLAG_RGB) { - self->params.tcp_mct = 1; - } - opj_setup_encoder (enc, &self->params, image); - stream = opj_stream_create (4096, OPJ_FALSE); - if (!stream) - goto open_error; - - mstream.allocsize = 4096; - mstream.data = g_malloc (mstream.allocsize); - mstream.offset = 0; - mstream.size = 0; - - opj_stream_set_read_function (stream, read_fn); - opj_stream_set_write_function (stream, write_fn); - opj_stream_set_skip_function (stream, skip_fn); - opj_stream_set_seek_function (stream, seek_fn); - opj_stream_set_user_data (stream, &mstream, NULL); - opj_stream_set_user_data_length (stream, mstream.size); - - if (!opj_start_compress (enc, image, stream)) - goto encode_error; - - if (!opj_encode (enc, stream)) - goto encode_error; - - if (!opj_end_compress (enc, stream)) - goto encode_error; - - opj_image_destroy (image); - opj_stream_destroy (stream); - opj_destroy_codec (enc); - - frame->output_buffer = gst_buffer_new (); - - if (self->is_jp2c) { - GstMapInfo map; - GstMemory *mem; - - mem = gst_allocator_alloc (NULL, 8, NULL); - gst_memory_map (mem, &map, GST_MAP_WRITE); - GST_WRITE_UINT32_BE (map.data, mstream.size + 8); - GST_WRITE_UINT32_BE (map.data + 4, GST_MAKE_FOURCC ('j', 'p', '2', 'c')); - gst_memory_unmap (mem, &map); - gst_buffer_append_memory (frame->output_buffer, mem); - } - - gst_buffer_append_memory (frame->output_buffer, - gst_memory_new_wrapped (0, mstream.data, mstream.allocsize, 0, - mstream.size, mstream.data, (GDestroyNotify) g_free)); - - GST_VIDEO_CODEC_FRAME_SET_SYNC_POINT (frame); - ret = - (i == - self->num_stripes - - 1) ? gst_video_encoder_finish_frame (encoder, - frame) : gst_video_encoder_finish_subframe (encoder, frame); - - } - + if (self->encode_frame (encoder, frame) != GST_FLOW_OK) + goto error; done: if (current_caps) gst_caps_unref (current_caps); return ret; -initialization_error: - { - gst_video_codec_frame_unref (frame); - GST_ELEMENT_ERROR (self, LIBRARY, INIT, - ("Failed to initialize OpenJPEG encoder"), (NULL)); - return GST_FLOW_ERROR; - } -map_read_error: - { - opj_destroy_codec (enc); - gst_video_codec_frame_unref (frame); - - GST_ELEMENT_ERROR (self, CORE, FAILED, - ("Failed to map input buffer"), (NULL)); - return GST_FLOW_ERROR; - } -fill_image_error: - { - opj_destroy_codec (enc); - gst_video_frame_unmap (&vframe); - gst_video_codec_frame_unref (frame); - - GST_ELEMENT_ERROR (self, LIBRARY, INIT, - ("Failed to fill OpenJPEG image"), (NULL)); - return GST_FLOW_ERROR; - } -open_error: - { - opj_image_destroy (image); - opj_destroy_codec (enc); - gst_video_codec_frame_unref (frame); - - GST_ELEMENT_ERROR (self, LIBRARY, INIT, - ("Failed to open OpenJPEG data"), (NULL)); - return GST_FLOW_ERROR; - } -encode_error: - { - opj_stream_destroy (stream); - g_free (mstream.data); - opj_image_destroy (image); - opj_destroy_codec (enc); - gst_video_codec_frame_unref (frame); - - GST_ELEMENT_ERROR (self, STREAM, ENCODE, - ("Failed to encode OpenJPEG stream"), (NULL)); - return GST_FLOW_ERROR; +error: + switch (self->last_error) { + case OPENJPEG_ERROR_INIT: + GST_ELEMENT_ERROR (self, LIBRARY, INIT, + ("Failed to initialize OpenJPEG encoder"), (NULL)); + break; + case OPENJPEG_ERROR_MAP_READ: + GST_ELEMENT_ERROR (self, CORE, FAILED, + ("Failed to map input buffer"), (NULL)); + break; + case OPENJPEG_ERROR_FILL_IMAGE: + GST_ELEMENT_ERROR (self, LIBRARY, INIT, + ("Failed to fill OpenJPEG image"), (NULL)); + break; + case OPENJPEG_ERROR_OPEN: + GST_ELEMENT_ERROR (self, LIBRARY, INIT, + ("Failed to open OpenJPEG data"), (NULL)); + break; + case OPENJPEG_ERROR_ENCODE: + GST_ELEMENT_ERROR (self, LIBRARY, INIT, + ("Failed to encode OpenJPEG data"), (NULL)); + break; + default: + GST_ELEMENT_ERROR (self, LIBRARY, INIT, + ("Failed to encode OpenJPEG data"), (NULL)); + break; } + gst_openjpeg_enc_flush_messages (self); + return GST_FLOW_ERROR; } static gboolean diff --git a/ext/openjpeg/gstopenjpegenc.h b/ext/openjpeg/gstopenjpegenc.h index 754b3b4cf..a2c81bb96 100644 --- a/ext/openjpeg/gstopenjpegenc.h +++ b/ext/openjpeg/gstopenjpegenc.h @@ -55,9 +55,17 @@ struct _GstOpenJPEGEnc gboolean is_jp2c; void (*fill_image) (opj_image_t * image, GstVideoFrame *frame); + gboolean (*encode_frame) (GstVideoEncoder * encoder, GstVideoCodecFrame *frame); opj_cparameters_t params; gint num_stripes; + + guint available_threads; + GQueue messages; + + GCond messages_cond; + + OpenJPEGErrorCode last_error; }; struct _GstOpenJPEGEncClass -- cgit v1.2.1