summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosep Torra <n770galaxy@gmail.com>2013-09-28 13:32:37 +0200
committerJosep Torra <n770galaxy@gmail.com>2013-09-28 13:32:37 +0200
commitc618a0a5790ba6cd086555fb79cd6d982e85de0d (patch)
treed456e2e74015348194b9e12b9add882edcec8c21
parent46462362e89208048aadd44c4fccba1383ecd64d (diff)
downloadgst-omx-c618a0a5790ba6cd086555fb79cd6d982e85de0d.tar.gz
examples: simplify the thread synchronization code
Make everithing more simple and fix the races conditions remaining in the previous approaches.
-rw-r--r--examples/egl/testegl.c48
1 files changed, 17 insertions, 31 deletions
diff --git a/examples/egl/testegl.c b/examples/egl/testegl.c
index fe24239..0391230 100644
--- a/examples/egl/testegl.c
+++ b/examples/egl/testegl.c
@@ -75,6 +75,8 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#define M_PI 3.141592654
#endif
+#define SYNC_BUFFERS TRUE
+
#define TRACE_VC_MEMORY_ENABLED 0
#if TRACE_VC_MEMORY_ENABLED
@@ -155,7 +157,6 @@ typedef struct
/* Interthread comunication */
GAsyncQueue *queue;
GMutex *queue_lock;
- GMutex *flow_lock;
GCond *cond;
gboolean flushing;
GstMiniObject *popped_obj;
@@ -659,7 +660,6 @@ init_intercom (APP_STATE_T * state)
state->queue =
g_async_queue_new_full ((GDestroyNotify) gst_mini_object_unref);
state->queue_lock = g_mutex_new ();
- state->flow_lock = g_mutex_new ();
state->cond = g_cond_new ();
}
@@ -675,10 +675,6 @@ terminate_intercom (APP_STATE_T * state)
g_mutex_free (state->queue_lock);
}
- if (state->flow_lock) {
- g_mutex_free (state->flow_lock);
- }
-
if (state->cond) {
g_cond_free (state->cond);
}
@@ -703,7 +699,6 @@ flush_start (APP_STATE_T * state)
g_cond_broadcast (state->cond);
g_mutex_unlock (state->queue_lock);
- g_mutex_lock (state->flow_lock);
while ((object = GST_MINI_OBJECT_CAST (g_async_queue_try_pop (state->queue)))) {
gst_mini_object_unref (object);
}
@@ -711,7 +706,6 @@ flush_start (APP_STATE_T * state)
flush_internal (state);
state->popped_obj = NULL;
g_mutex_unlock (state->queue_lock);
- g_mutex_unlock (state->flow_lock);
}
static void
@@ -785,25 +779,19 @@ static gboolean
handle_queued_objects (APP_STATE_T * state)
{
GstMiniObject *object = NULL;
- gboolean done = FALSE;
g_mutex_lock (state->queue_lock);
if (state->flushing) {
g_cond_broadcast (state->cond);
- done = TRUE;
+ goto beach;
} else if (g_async_queue_length (state->queue) == 0) {
- done = TRUE;
+ goto beach;
}
- g_mutex_unlock (state->queue_lock);
-
- while (!done
- && (object =
- GST_MINI_OBJECT_CAST (g_async_queue_try_pop (state->queue)))) {
+ if ((object = GST_MINI_OBJECT_CAST (g_async_queue_try_pop (state->queue)))) {
if (GST_IS_BUFFER (object)) {
GstBuffer *buffer = GST_BUFFER_CAST (object);
- g_mutex_lock (state->queue_lock);
if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_PREROLL)) {
g_print ("got a reclaim buffer\n");
flush_internal (state);
@@ -811,8 +799,10 @@ handle_queued_objects (APP_STATE_T * state)
update_image (state, buffer);
}
gst_buffer_unref (buffer);
- g_mutex_unlock (state->queue_lock);
render_scene (state);
+ if (!SYNC_BUFFERS) {
+ object = NULL;
+ }
} else if (GST_IS_MESSAGE (object)) {
GstMessage *message = GST_MESSAGE_CAST (object);
g_print ("\nmessage %p ", message);
@@ -839,7 +829,6 @@ handle_queued_objects (APP_STATE_T * state)
g_print ("\nevent %p %s\n", event,
gst_event_type_get_name (GST_EVENT_TYPE (event)));
- g_mutex_lock (state->queue_lock);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_EOS:
flush_internal (state);
@@ -847,18 +836,19 @@ handle_queued_objects (APP_STATE_T * state)
default:
break;
}
- g_mutex_unlock (state->queue_lock);
gst_event_unref (event);
+ object = NULL;
}
+ }
- g_mutex_lock (state->queue_lock);
+ if (object) {
state->popped_obj = object;
g_cond_broadcast (state->cond);
- g_mutex_unlock (state->queue_lock);
- g_mutex_lock (state->flow_lock);
- g_mutex_unlock (state->flow_lock);
}
+beach:
+ g_mutex_unlock (state->queue_lock);
+
return FALSE;
}
@@ -867,14 +857,13 @@ queue_object (APP_STATE_T * state, GstMiniObject * obj, gboolean synchronous)
{
gboolean res = TRUE;
- g_mutex_lock (state->flow_lock);
+ g_mutex_lock (state->queue_lock);
if (state->flushing) {
gst_mini_object_unref (obj);
res = FALSE;
goto beach;
}
- g_mutex_lock (state->queue_lock);
g_async_queue_push (state->queue, obj);
if (synchronous) {
@@ -883,10 +872,9 @@ queue_object (APP_STATE_T * state, GstMiniObject * obj, gboolean synchronous)
g_cond_wait (state->cond, state->queue_lock);
} while (!state->flushing && state->popped_obj != obj);
}
- g_mutex_unlock (state->queue_lock);
beach:
- g_mutex_unlock (state->flow_lock);
+ g_mutex_unlock (state->queue_lock);
return res;
}
@@ -903,10 +891,8 @@ buffers_cb (GstElement * fakesink, GstBuffer * buffer, GstPad * pad,
gpointer user_data)
{
APP_STATE_T *state = (APP_STATE_T *) user_data;
- gboolean is_reclaim =
- GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_PREROLL);
queue_object (state, GST_MINI_OBJECT_CAST (gst_buffer_ref (buffer)),
- !is_reclaim);
+ SYNC_BUFFERS);
}
static gboolean