diff options
author | Benjamin Otte <otte@gnome.org> | 2002-03-17 00:50:42 +0000 |
---|---|---|
committer | Benjamin Otte <otte@gnome.org> | 2002-03-17 00:50:42 +0000 |
commit | 0aa86db75c7c0894432caa17971bd629506d1346 (patch) | |
tree | 35042b293dc601b4f7aed3b45ac044b9cee10632 | |
parent | df68257f9499fdaafbc57a0a09585a66458182e9 (diff) | |
download | gstreamer-0aa86db75c7c0894432caa17971bd629506d1346.tar.gz |
- rewrite of GstQueue, hope it works now, though I'm not absolutely sure there couldn'T be a deadlock
Original commit message from CVS:
- rewrite of GstQueue, hope it works now, though I'm not absolutely sure there couldn'T be a deadlock
- fixes to GstThread (this thing has to work without calling queues\!)
- changes to scheduler (use GAsyncQueue for events)
-rw-r--r-- | gst/gstdata.c | 2 | ||||
-rw-r--r-- | gst/gstdata.h | 4 | ||||
-rw-r--r-- | gst/gstevent.c | 4 | ||||
-rw-r--r-- | gst/gstevent.h | 1 | ||||
-rw-r--r-- | gst/gstqueue.c | 539 | ||||
-rw-r--r-- | gst/gstqueue.h | 37 | ||||
-rw-r--r-- | gst/gstthread.c | 7 | ||||
-rw-r--r-- | gst/schedulers/gstbasicscheduler.c | 45 | ||||
-rw-r--r-- | plugins/elements/gstqueue.c | 539 | ||||
-rw-r--r-- | plugins/elements/gstqueue.h | 37 |
10 files changed, 706 insertions, 509 deletions
diff --git a/gst/gstdata.c b/gst/gstdata.c index b36c3faa94..9fb6aefde3 100644 --- a/gst/gstdata.c +++ b/gst/gstdata.c @@ -22,7 +22,7 @@ gst_data_init (GstData *data) data->flags = 0; for (i = 0; i < GST_OFFSET_TYPES; i++) { - data->offset[i] = 0; + data->offset[i] = GST_OFFSET_INVALID; } } void diff --git a/gst/gstdata.h b/gst/gstdata.h index 6c922c9384..759d5186cd 100644 --- a/gst/gstdata.h +++ b/gst/gstdata.h @@ -65,7 +65,9 @@ typedef enum { } GstDataType; /* number of types */ -#define GST_OFFSET_TYPES 3 +#define GST_OFFSET_TYPES 3 /* number of different GstOffsetType types */ +#define GST_OFFSET_INVALID (~((guint64) 0)) /* invalid (or unitialized) offset */ +#define GST_OFFSET_IS_INVLAID(ofset) (offset == GST_OFFSET_INVALID) typedef enum { GST_OFFSET_TIME = 0, GST_OFFSET_BYTES = 1, diff --git a/gst/gstevent.c b/gst/gstevent.c index 4e301c815d..231e15aab5 100644 --- a/gst/gstevent.c +++ b/gst/gstevent.c @@ -129,7 +129,7 @@ gst_event_seek_init (GstEventSeek *event, GstSeekType type, GstOffsetType offset event->original = offset_type; for (i = 0; i < GST_OFFSET_TYPES; i++) { - event->offset[i] = 0; + event->offset[i] = GST_OFFSET_INVALID; event->accuracy[i] = GST_ACCURACY_NONE; } event->flush = TRUE; @@ -275,7 +275,7 @@ gst_event_length_init (GstEventLength *event) for (i = 0; i < GST_OFFSET_TYPES; i++) { event->accuracy[i] = GST_ACCURACY_NONE; - event->length[i] = 0; + event->length[i] = GST_OFFSET_INVALID; } } /** diff --git a/gst/gstevent.h b/gst/gstevent.h index febcd8a153..eaf92ab3dc 100644 --- a/gst/gstevent.h +++ b/gst/gstevent.h @@ -58,6 +58,7 @@ typedef enum { #define GST_EVENT_UNLOCK(event) ((GstEventUnLock *) (event)) #define GST_EVENT_SEEK_TYPE(event) (GST_EVENT_SEEK(event)->type) +#define GST_EVENT_SEEK_FLUSH(event) (GST_EVENT_SEEK(event)->flush) typedef struct _GstEventEOS GstEventEOS; typedef struct _GstEventDiscontinuous GstEventDiscontinuous; diff --git a/gst/gstqueue.c b/gst/gstqueue.c index c60803a110..9243490b44 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -56,18 +56,39 @@ enum { enum { ARG_0, - ARG_LEVEL_BUFFERS, - ARG_LEVEL_BYTES, - ARG_LEVEL_TIME, - ARG_SIZE_BUFFERS, - ARG_SIZE_BYTES, - ARG_SIZE_TIME, ARG_LEAKY, ARG_LEVEL, ARG_MAX_LEVEL, ARG_MAY_DEADLOCK, + ARG_MAX_WAIT, }; +/* FIXME: I don't want to copy from glib */ +struct _GAsyncQueue +{ + GMutex *mutex; + GCond *cond; + GQueue *queue; + guint waiting_threads; + guint ref_count; +}; + +static gpointer +g_async_queue_peek_unlocked (GAsyncQueue *queue) +{ + return g_queue_peek_tail (queue->queue); +} +/* static gpointer +g_async_queue_peek (GAsyncQueue *queue) +{ + gpointer retval; + + g_mutex_lock (queue->mutex); + retval = g_async_queue_peek_unlocked (queue); + g_mutex_unlock (queue->mutex); + + return retval; +}*/ static void gst_queue_class_init (GstQueueClass *klass); static void gst_queue_init (GstQueue *queue); @@ -79,12 +100,13 @@ static void gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec); static void gst_queue_chain (GstPad *pad, GstData *buf); -static GstBuffer * gst_queue_get (GstPad *pad); +static GstData * gst_queue_get (GstPad *pad); static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad); +static gpointer gst_queue_upstream_event (GstPad *pad, GstData *event); static void gst_queue_locked_flush (GstQueue *queue); -/*static void gst_queue_flush (GstQueue *queue); -*/ +static void gst_queue_flush (GstQueue *queue); + static GstElementStateReturn gst_queue_change_state (GstElement *element); @@ -148,11 +170,14 @@ gst_queue_class_init (GstQueueClass *klass) g_param_spec_int ("level", "Level", "How many buffers are in the queue.", 0, G_MAXINT, 0, G_PARAM_READABLE)); g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_LEVEL, - g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.", - 0, G_MAXINT, 100, G_PARAM_READWRITE)); + g_param_spec_int ("max-level", "Maximum Level", "How many buffers the queue holds.", + 1, G_MAXINT, 100, G_PARAM_READWRITE)); g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAY_DEADLOCK, - g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING", + g_param_spec_boolean ("may-deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING", TRUE, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_WAIT, + g_param_spec_ulong ("max-wait", "Max Wait", "How long the queue will wait", + 1, 10 * 1000 * 1000, 10 * 1000, G_PARAM_READWRITE)); gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose); gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property); @@ -208,23 +233,22 @@ gst_queue_init (GstQueue *queue) gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad); gst_pad_set_connect_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_connect)); gst_pad_set_getcaps_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps)); - + gst_pad_set_event_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_upstream_event)); + queue->leaky = GST_QUEUE_NO_LEAK; - queue->queue = NULL; - queue->level_buffers = 0; - queue->level_bytes = 0; - queue->level_time = 0LL; - queue->size_buffers = 100; /* 100 buffers */ - queue->size_bytes = 100 * 1024; /* 100KB */ - queue->size_time = 1000000000LL; /* 1sec */ + queue->queue = g_async_queue_new (); + queue->not_full = g_cond_new (); + queue->need_src = GST_QUEUE_NEED_NOTHING; + queue->need_sink = GST_QUEUE_NEED_NOTHING; + + queue->size = 100; /* 100 buffers */ queue->may_deadlock = TRUE; + queue->max_wait = 10 * 1000; /* wait max 10 milliseconds */ - queue->qlock = g_mutex_new (); - queue->reader = FALSE; - queue->writer = FALSE; - queue->not_empty = g_cond_new (); - queue->not_full = g_cond_new (); - GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n"); + queue->upstream_event = NULL; + queue->upstream_return = NULL; + queue->upstream_mutex = g_mutex_new(); + queue->upstream_cond = g_cond_new(); } static void @@ -232,9 +256,14 @@ gst_queue_dispose (GObject *object) { GstQueue *queue = GST_QUEUE (object); - g_mutex_free (queue->qlock); - g_cond_free (queue->not_empty); + if (queue->upstream_event || queue->upstream_return) + g_warning ("losing event while disposing queue"); + g_cond_free (queue->upstream_cond); + g_mutex_free (queue->upstream_mutex); + g_cond_free (queue->not_full); + g_async_queue_unref (queue->queue); + G_OBJECT_CLASS (parent_class)->dispose (object); } @@ -250,39 +279,42 @@ gst_queue_get_bufferpool (GstPad *pad) } static void -gst_queue_cleanup_buffers (gpointer data, const gpointer user_data) -{ - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p\n", data); - - gst_data_unref (GST_DATA (data)); -} - -static void gst_queue_locked_flush (GstQueue *queue) { - g_list_foreach (queue->queue, gst_queue_cleanup_buffers, - (gpointer) queue); - g_list_free (queue->queue); - - queue->queue = NULL; - queue->level_buffers = 0; - queue->timeval = NULL; + GstData *data; + + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "cleaning queue\n"); + while ((data = GST_DATA (g_async_queue_try_pop_unlocked (queue->queue))) != NULL) + { + switch (GST_DATA_TYPE (data)) + { + case GST_EVENT_NEWMEDIA: + queue->need_sink |= GST_QUEUE_NEED_NEWMEDIA; + break; + case GST_EVENT_EOS: + queue->need_sink |= GST_QUEUE_NEED_EOS; + break; + default: + queue->need_sink |= GST_QUEUE_NEED_DISCONTINUOUS; + break; + } + gst_data_unref (data); + } + queue->need_sink |= queue->need_src; + queue->need_src = GST_QUEUE_NEED_NOTHING; + g_cond_signal (queue->not_full); } -/* warning: `gst_queue_flush' defined but not used static void gst_queue_flush (GstQueue *queue) { - g_mutex_lock (queue->qlock); + g_async_queue_lock (queue->queue); gst_queue_locked_flush (queue); - g_mutex_unlock (queue->qlock); + g_async_queue_unlock (queue->queue); } -*/ - static void gst_queue_chain (GstPad *pad, GstData *buf) { GstQueue *queue; - gboolean reader; g_return_if_fail (pad != NULL); g_return_if_fail (GST_IS_PAD (pad)); @@ -291,134 +323,136 @@ gst_queue_chain (GstPad *pad, GstData *buf) queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); restart: - /* we have to lock the queue since we span threads */ - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ()); - g_mutex_lock (queue->qlock); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld\n", pthread_self ()); - if (GST_IS_EVENT (buf)) { - switch (GST_DATA_TYPE (buf)) { - case GST_EVENT_FLUSH: - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n"); - gst_queue_locked_flush (queue); + /* handle events */ + if (queue->upstream_event) + { + g_print ("handling event\n"); + g_mutex_lock (queue->upstream_mutex); + GstPad *peer = GST_PAD_PEER (pad); + + if (peer) + { + queue->upstream_return = gst_pad_send_event (peer, queue->upstream_event); + } else { + gst_data_unref (queue->upstream_event); + } + queue->upstream_event = NULL; + g_cond_signal (queue->upstream_cond); + g_mutex_unlock (queue->upstream_mutex); + g_print ("done handling event\n"); + } + + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n", buf, GST_BUFFER_SIZE(buf)); + + /* if we leak on the upstream side, drop the current buffer */ + if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM && g_async_queue_length (queue->queue) >= queue->size) { + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n"); + /* check if we have to schedule an event for when the stream gets empty */ + switch (GST_DATA_TYPE (buf)) + { + case GST_EVENT_NEWMEDIA: + g_async_queue_lock (queue->queue); + queue->need_sink |= GST_QUEUE_NEED_NEWMEDIA; + g_async_queue_unlock (queue->queue); + break; + case GST_EVENT_DISCONTINUOUS: + g_async_queue_lock (queue->queue); + queue->need_sink |= GST_QUEUE_NEED_DISCONTINUOUS; + g_async_queue_unlock (queue->queue); break; case GST_EVENT_EOS: - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "eos in on %s %d\n", - GST_ELEMENT_NAME (queue), queue->level_buffers); + g_async_queue_lock (queue->queue); + queue->need_sink |= GST_QUEUE_NEED_EOS; + g_async_queue_unlock (queue->queue); break; default: - /* gst_pad_event_default (pad, GST_EVENT (buf));*/ break; } - } - - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf)); - - if (queue->level_buffers == queue->size_buffers) { - /* if this is a leaky queue... */ - if (queue->leaky) { - /* FIXME don't want to leak events! */ - /* if we leak on the upstream side, drop the current buffer */ - if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) { - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n"); - if (GST_IS_EVENT (buf)) - fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n", - GST_ELEMENT_NAME(GST_ELEMENT(queue)), - GST_DATA_TYPE(buf)); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n"); - gst_data_unref(buf); - /* now we have to clean up and exit right away */ - g_mutex_unlock (queue->qlock); - return; - } - /* otherwise we have to push a buffer off the other end */ - else { - GList *front; - GstData *leakbuf; - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n"); - front = queue->queue; - leakbuf = (GstData *)(front->data); - if (GST_IS_EVENT (leakbuf)) - fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n", - GST_ELEMENT_NAME(GST_ELEMENT(queue)), - GST_DATA_TYPE(leakbuf)); - queue->level_buffers--; - queue->level_bytes -= GST_BUFFER_SIZE(leakbuf); + gst_data_unref(buf); + /* now we have to exit right away */ + return; + } + else if (queue->leaky == GST_QUEUE_LEAK_DOWNSTREAM) + { + /* while there are too much buffers */ + while (g_async_queue_length (queue->queue) >= queue->size) + { + GstData *leakbuf; + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n"); + g_async_queue_lock (queue->queue); + leakbuf = (GstData *) g_async_queue_try_pop_unlocked (queue->queue); + if (leakbuf != NULL) + { + switch (GST_DATA_TYPE (buf)) + { + case GST_EVENT_NEWMEDIA: + queue->need_src |= GST_QUEUE_NEED_NEWMEDIA; + break; + case GST_EVENT_EOS: + queue->need_src |= GST_QUEUE_NEED_EOS; + break; + default: + queue->need_src |= GST_QUEUE_NEED_DISCONTINUOUS; + break; + } gst_data_unref (leakbuf); - queue->queue = g_list_remove_link (queue->queue, front); - g_list_free (front); } + g_async_queue_unlock (queue->queue); } - - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n", - queue->level_buffers, queue->size_buffers); - while (queue->level_buffers == queue->size_buffers) { - /* if there's a pending state change for this queue or its manager, switch */ - /* back to iterator so bottom half of state change executes */ - while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); - g_mutex_unlock (queue->qlock); - if (gst_element_interrupt (GST_ELEMENT (queue))) - return; - goto restart; - } - if (GST_STATE (queue) != GST_STATE_PLAYING) { - /* this means the other end is shut down */ - /* try to signal to resolve the error */ - if (!queue->may_deadlock) { - gst_data_unref (buf); - g_mutex_unlock (queue->qlock); - gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down"); - return; - } - else { - gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart source pad elements"); - } + } + + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n", + g_async_queue_length (queue->queue), queue->size); + /* while there isn't enough space available */ + if (g_async_queue_length (queue->queue) >= queue->size) + { + /* if there's a pending state change for this queue or its manager, switch */ + /* back to iterator so bottom half of state change executes */ + while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); + if (gst_element_interrupt (GST_ELEMENT (queue))) + return; + goto restart; + } + if (GST_STATE (queue) != GST_STATE_PLAYING) { + /* this means the other end is shut down */ + /* try to signal to resolve the error */ + if (!queue->may_deadlock) { + gst_data_unref (buf); + gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down"); + return; + } else { + gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart source pad elements"); } - - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers); - if (queue->writer) - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!\n"); - queue->writer = TRUE; - g_cond_wait (queue->not_full, queue->qlock); - queue->writer = FALSE; - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n"); } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n", - queue->level_buffers, queue->size_buffers); + + /* FIXME: we're poking in AsyncQueue internals here */ + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", g_async_queue_length (queue->queue), queue->size); + g_print ("waiting for not null\n"); + g_mutex_lock (queue->upstream_mutex); + if (queue->upstream_event == NULL) + g_cond_wait (queue->not_full, queue->upstream_mutex); + g_mutex_unlock (queue->upstream_mutex); + g_print ("not null\n"); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n"); + goto restart; } + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n", g_async_queue_length (queue->queue), queue->size); /* put the buffer on the tail of the list */ - queue->queue = g_list_append (queue->queue, buf); - queue->level_buffers++; - queue->level_bytes += GST_BUFFER_SIZE(buf); + g_async_queue_push (queue->queue, buf); GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d\n", - GST_DEBUG_PAD_NAME(pad), - queue->level_buffers, queue->size_buffers); + GST_DEBUG_PAD_NAME(pad), g_async_queue_length (queue->queue), queue->size); - /* this assertion _has_ to hold */ - /* g_assert (g_list_length (queue->queue) == queue->level_buffers); */ - - /* reader waiting on an empty queue */ - reader = queue->reader; - - g_mutex_unlock (queue->qlock); - - if (reader) - { - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty\n"); - g_cond_signal (queue->not_empty); - } } -static GstBuffer * +static GstData * gst_queue_get (GstPad *pad) { GstQueue *queue; - GstBuffer *buf = NULL; - GList *front; - gboolean writer; + GstData *data; g_assert(pad != NULL); g_assert(GST_IS_PAD(pad)); @@ -428,27 +462,58 @@ gst_queue_get (GstPad *pad) queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); restart: - /* have to lock for thread-safety */ - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ()); - g_mutex_lock (queue->qlock); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p\n", pthread_self (), queue->not_empty); + + /* make sure we're not in the middle of a state change */ + while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); + if (gst_element_interrupt (GST_ELEMENT (queue))) + return NULL; + } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers); - while (queue->level_buffers == 0) { + g_async_queue_lock (queue->queue); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", g_async_queue_length_unlocked (queue->queue), queue->size); + if (queue->need_src) + { + GstData *event; + switch (queue->need_src) + { + case GST_QUEUE_NEED_NEWMEDIA: + event = GST_DATA (gst_event_new_newmedia ()); + break; + case GST_QUEUE_NEED_EOS: + event = GST_DATA (gst_event_new_eos ()); + break; + case GST_QUEUE_NEED_DISCONTINUOUS: + event = GST_DATA (gst_event_new_discontinuous ()); + break; + default: + event = NULL; + g_assert_not_reached (); + break; + } + data = g_async_queue_peek_unlocked (queue->queue); + if (data) + { + guint i; + for (i = 0; i < GST_OFFSET_TYPES; i++) + { + event->offset[i] = data->offset[i]; + } + } + queue->need_src = GST_QUEUE_NEED_NOTHING; + g_async_queue_unlock (queue->queue); + return event; + } + if ((data = g_async_queue_pop_unlocked (queue->queue)) == NULL) + { + GTimeVal timeval; /* if there's a pending state change for this queue or its manager, switch * back to iterator so bottom half of state change executes */ - while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); - g_mutex_unlock (queue->qlock); - if (gst_element_interrupt (GST_ELEMENT (queue))) - return NULL; - goto restart; - } if (GST_STATE (queue) != GST_STATE_PLAYING) { /* this means the other end is shut down */ if (!queue->may_deadlock) { - g_mutex_unlock (queue->qlock); + g_async_queue_unlock (queue->queue); gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down"); goto restart; } @@ -457,56 +522,37 @@ restart: } } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers); - if (queue->reader) - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!\n"); - queue->reader = TRUE; - g_cond_wait (queue->not_empty, queue->qlock); - queue->reader = FALSE; + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", g_async_queue_length_unlocked (queue->queue), queue->size); + g_get_current_time (&timeval); + g_time_val_add (&timeval, queue->max_wait); + g_print ("waiting for data\n"); + data = g_async_queue_timed_pop_unlocked (queue->queue, &timeval); + g_print ("%s\n", data == NULL ? "timer goes off while waiting for data\n" : "data available\n"); + if (data == NULL) + { + g_async_queue_unlock (queue->queue); + gst_element_yield (GST_ELEMENT (data)); + goto restart; + } GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal\n"); } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers); - - front = queue->queue; - buf = (GstBuffer *)(front->data); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue\n", buf); - queue->queue = g_list_remove_link (queue->queue, front); - g_list_free (front); - - queue->level_buffers--; - queue->level_bytes -= GST_BUFFER_SIZE(buf); - + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", g_async_queue_length_unlocked (queue->queue), queue->size); + g_async_queue_unlock (queue->queue); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d\n", GST_DEBUG_PAD_NAME(pad), - queue->level_buffers, queue->size_buffers); - - /* this assertion _has_ to hold */ - /* g_assert (g_list_length (queue->queue) == queue->level_buffers); */ + g_async_queue_length (queue->queue), queue->size); - /* writer waiting on a full queue */ - writer = queue->writer; - - g_mutex_unlock (queue->qlock); - - if (writer) - { - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full\n"); - g_cond_signal (queue->not_full); - } + g_cond_signal (queue->not_full); /* FIXME where should this be? locked? */ - if (GST_IS_EVENT(buf)) { - switch (GST_DATA_TYPE(buf)) { - case GST_EVENT_EOS: - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos\n", GST_ELEMENT_NAME (queue)); - gst_element_set_eos (GST_ELEMENT (queue)); - break; - default: - break; - } + if (GST_DATA_TYPE(data) == GST_EVENT_EOS) + { + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos\n", GST_ELEMENT_NAME (queue)); + gst_element_set_eos (GST_ELEMENT (queue)); } - return buf; + return data; } static GstElementStateReturn @@ -524,13 +570,12 @@ gst_queue_change_state (GstElement *element) /* lock the queue so another thread (not in sync with this thread's state) * can't call this queue's _get (or whatever) */ - g_mutex_lock (queue->qlock); + g_async_queue_lock (queue->queue); new_state = GST_STATE_PENDING (element); if (new_state == GST_STATE_PAUSED) { - //g_cond_signal (queue->not_full); - //g_cond_signal (queue->not_empty); + /* g_cond_signal (queue->not_full); */ } else if (new_state == GST_STATE_READY) { gst_queue_locked_flush (queue); @@ -538,16 +583,14 @@ gst_queue_change_state (GstElement *element) else if (new_state == GST_STATE_PLAYING) { if (!GST_PAD_IS_CONNECTED (queue->sinkpad)) { /* FIXME can this be? */ - if (queue->reader) - g_cond_signal (queue->not_empty); - g_mutex_unlock (queue->qlock); + g_async_queue_unlock (queue->queue); return GST_STATE_FAILURE; } } ret = GST_ELEMENT_CLASS (parent_class)->change_state (element); - g_mutex_unlock (queue->qlock); + g_async_queue_unlock (queue->queue); GST_DEBUG_LEAVE("('%s')", GST_ELEMENT_NAME (element)); return ret; @@ -567,12 +610,19 @@ gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GPa switch (prop_id) { case ARG_LEAKY: queue->leaky = g_value_get_enum (value); + g_object_notify (object, "leaky"); break; case ARG_MAX_LEVEL: - queue->size_buffers = g_value_get_int (value); + queue->size = g_value_get_int (value); + g_object_notify (object, "max-level"); break; case ARG_MAY_DEADLOCK: queue->may_deadlock = g_value_get_boolean (value); + g_object_notify (object, "may-deadlock"); + break; + case ARG_MAX_WAIT: + queue->max_wait = g_value_get_ulong (value); + g_object_notify (object, "max-wait"); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -595,16 +645,67 @@ gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpe g_value_set_enum (value, queue->leaky); break; case ARG_LEVEL: - g_value_set_int (value, queue->level_buffers); + g_value_set_int (value, g_async_queue_length (queue->queue)); break; case ARG_MAX_LEVEL: - g_value_set_int (value, queue->size_buffers); + g_value_set_int (value, queue->size); break; case ARG_MAY_DEADLOCK: g_value_set_boolean (value, queue->may_deadlock); break; + case ARG_MAX_WAIT: + g_value_set_ulong (value, queue->max_wait); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } +static gpointer +gst_queue_upstream_event (GstPad *pad, GstData *event) +{ + GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad)); + gpointer *ret = NULL; + + /* check that everything is ok. */ + if (queue->upstream_event || queue->upstream_return) + { + g_warning ("unhandled event in queue."); + queue->upstream_return = NULL; + } + + /* transfer event */ + g_mutex_lock (queue->upstream_mutex); + gst_data_ref (event); + queue->upstream_event = event; + g_cond_signal (queue->not_full); + g_print ("waiting for handling of upstream event\n"); + while (queue->upstream_event != NULL) + g_cond_wait (queue->upstream_cond, queue->upstream_mutex); + g_print ("upstream event handled\n"); + ret = queue->upstream_return; + queue->upstream_return = NULL; + g_mutex_unlock (queue->upstream_mutex); + + /* handle event */ + if (ret != NULL) + { + switch (GST_DATA_TYPE (event)) + { + case GST_EVENT_SEEK: + if (GST_EVENT_SEEK_FLUSH(event)) + gst_queue_flush (queue); + break; + case GST_EVENT_FLUSH: + gst_queue_flush (queue); + break; + default: + break; + } + } + + /* return */ + gst_data_unref (event); + return ret; +} + diff --git a/gst/gstqueue.h b/gst/gstqueue.h index 06485edbc1..48be10adf4 100644 --- a/gst/gstqueue.h +++ b/gst/gstqueue.h @@ -53,6 +53,13 @@ enum { GST_QUEUE_LEAK_DOWNSTREAM = 2 }; +enum { + GST_QUEUE_NEED_NOTHING = 0x0, + GST_QUEUE_NEED_DISCONTINUOUS = 0x1, + GST_QUEUE_NEED_EOS = 0x3, + GST_QUEUE_NEED_NEWMEDIA = 0x7, +}; + typedef struct _GstQueue GstQueue; typedef struct _GstQueueClass GstQueueClass; @@ -63,27 +70,21 @@ struct _GstQueue { GstPad *srcpad; /* the queue of buffers we're keeping our grubby hands on */ - GList *queue; - - guint level_buffers; /* number of buffers queued here */ - guint level_bytes; /* number of bytes queued here */ - guint64 level_time; /* amount of time queued here */ - - guint size_buffers; /* size of queue in buffers */ - guint size_bytes; /* size of queue in bytes */ - guint64 size_time; /* size of queue in time */ + GAsyncQueue *queue; + GCond *not_full; /* signals space now available for writing */ + /* lock queue before changing next two values */ + guint8 need_src; /* we need this event before pushing the next buffer */ + guint8 need_sink; /* we need this event before popping the next buffer */ + gint size; /* size of queue in buffers */ gint leaky; /* whether the queue is leaky, and if so at which end */ gboolean may_deadlock; /* it the queue should fail on possible deadlocks */ - - GMutex *qlock; /* lock for queue (vs object lock) */ - /* we are single reader and single writer queue */ - gboolean reader; /* reader waiting on empty queue */ - gboolean writer; /* writer waiting on full queue */ - GCond *not_empty; /* signals buffers now available for reading */ - GCond *not_full; /* signals space now available for writing */ - - GTimeVal *timeval; /* the timeout for the queue locking */ + gulong max_wait; /* the timeout for locking in microseconds */ + + GstData *upstream_event; + gpointer upstream_return; + GMutex *upstream_mutex; + GCond *upstream_cond; }; struct _GstQueueClass { diff --git a/gst/gstthread.c b/gst/gstthread.c index fb54425cd7..84b785541d 100644 --- a/gst/gstthread.c +++ b/gst/gstthread.c @@ -170,7 +170,6 @@ gst_thread_dispose (GObject *object) G_OBJECT_CLASS (parent_class)->dispose (object); if (GST_ELEMENT_SCHED (thread)) { - gst_object_destroy (GST_OBJECT (GST_ELEMENT_SCHED (thread))); gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (thread))); } } @@ -338,11 +337,8 @@ gst_thread_change_state (GstElement * element) * FIXME also make this more efficient by keeping list of managed queues */ THR_DEBUG ("waking queue \"%s\"\n", GST_ELEMENT_NAME (element)); - g_mutex_lock (queue->qlock); GST_STATE_PENDING (element) = GST_STATE_PAUSED; g_cond_signal (queue->not_full); - g_cond_signal (queue->not_empty); - g_mutex_unlock (queue->qlock); } else { GList *pads = GST_ELEMENT_PADS (element); @@ -376,10 +372,7 @@ gst_thread_change_state (GstElement * element) THR_DEBUG (" element \"%s\" has pad cross sched boundary\n", GST_ELEMENT_NAME (element)); /* FIXME!! */ - g_mutex_lock (queue->qlock); g_cond_signal (queue->not_full); - g_cond_signal (queue->not_empty); - g_mutex_unlock (queue->qlock); } } } diff --git a/gst/schedulers/gstbasicscheduler.c b/gst/schedulers/gstbasicscheduler.c index f1286314f9..79d1274cf4 100644 --- a/gst/schedulers/gstbasicscheduler.c +++ b/gst/schedulers/gstbasicscheduler.c @@ -89,8 +89,8 @@ struct _GstBasicScheduler { GList *chains; gint num_chains; - GMutex *event_lock; - GList *event_queue; + GAsyncQueue *event_queue; + gboolean has_events; /* speed up knowing if events are waiting - must only be set when event_queue is locked */ GstBasicSchedulerState state; }; @@ -124,7 +124,7 @@ static GstSchedulerState gst_basic_scheduler_iterate (GstScheduler *sched); static void gst_basic_scheduler_insert_event (GstScheduler *sched, GstPad *pad, GstData *event); static void gst_basic_scheduler_handle_events (GstBasicScheduler *sched); -#define gst_basic_scheduler_events(sched) G_STMT_START{ if ((sched) && (GST_BASIC_SCHEDULER (sched)->event_queue)) \ +#define gst_basic_scheduler_events(sched) G_STMT_START{ if ((sched) && (GST_BASIC_SCHEDULER (sched)->has_events)) \ gst_basic_scheduler_handle_events(GST_BASIC_SCHEDULER (sched)); }G_STMT_END static void gst_basic_scheduler_show (GstScheduler *sched); @@ -194,26 +194,24 @@ gst_basic_scheduler_init (GstBasicScheduler *scheduler) scheduler->num_elements = 0; scheduler->chains = NULL; scheduler->num_chains = 0; - scheduler->event_lock = g_mutex_new(); - scheduler->event_queue = NULL; + scheduler->event_queue = g_async_queue_new (); + scheduler->has_events = FALSE; } static void gst_basic_scheduler_dispose (GObject *object) { + GstBasicSchedulerEvent *ev; GstBasicScheduler *sched = GST_BASIC_SCHEDULER (object); - GList *list; - g_mutex_free (sched->event_lock); - list = sched->event_queue; - while (list) + g_async_queue_lock (sched->event_queue); + while ((ev = (GstBasicSchedulerEvent *) g_async_queue_try_pop_unlocked (sched->event_queue)) != NULL) { - GstBasicSchedulerEvent *ev = (GstBasicSchedulerEvent *) list->data; gst_data_unref (GST_DATA (ev->event)); g_free (ev); - list = g_list_next (list); } - g_list_free (sched->event_queue); + g_async_queue_unlock (sched->event_queue); + g_async_queue_unref (sched->event_queue); G_OBJECT_CLASS (parent_class)->dispose (object); } @@ -1051,10 +1049,7 @@ gst_basic_scheduler_remove_element (GstScheduler * sched, GstElement * element) if (GST_ELEMENT_IS_COTHREAD_STOPPING (element)) { GstElement *entry = GST_ELEMENT (cothread_get_private (cothread_current ())); - if (entry == element) { - g_warning ("removing currently running element! %s", GST_ELEMENT_NAME (entry)); - } - else if (entry) { + if (entry && entry != element) { GST_INFO (GST_CAT_SCHEDULING, "moving stopping to element \"%s\"", GST_ELEMENT_NAME (entry)); GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING); @@ -1385,16 +1380,17 @@ gst_basic_scheduler_handle_events (GstBasicScheduler *sched) { GstBasicSchedulerEvent *ev; - while (sched->event_queue) + g_async_queue_lock (sched->event_queue); + while ((ev = (GstBasicSchedulerEvent *) g_async_queue_try_pop_unlocked (sched->event_queue)) != NULL) { + g_async_queue_unlock (sched->event_queue); GST_DEBUG (GST_CAT_DATAFLOW, "handling asynchronous event"); - g_mutex_lock (sched->event_lock); - ev = sched->event_queue->data; - sched->event_queue = g_list_delete_link (sched->event_queue, sched->event_queue); - g_mutex_unlock (sched->event_lock); gst_pad_send_event (ev->pad, ev->event); g_free (ev); + g_async_queue_lock (sched->event_queue); } + sched->has_events = FALSE; + g_async_queue_unlock (sched->event_queue); } static void @@ -1404,9 +1400,10 @@ gst_basic_scheduler_insert_event (GstScheduler *scheduler, GstPad *pad, GstData GstBasicSchedulerEvent *ev = g_new (GstBasicSchedulerEvent, 1); ev->pad = pad; ev->event = event; - g_mutex_lock (sched->event_lock); - sched->event_queue = g_list_prepend (sched->event_queue, ev); - g_mutex_unlock (sched->event_lock); + g_async_queue_lock (sched->event_queue); + g_async_queue_push_unlocked (sched->event_queue, ev); + sched->has_events = TRUE; + g_async_queue_unlock (sched->event_queue); } static void diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index c60803a110..9243490b44 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -56,18 +56,39 @@ enum { enum { ARG_0, - ARG_LEVEL_BUFFERS, - ARG_LEVEL_BYTES, - ARG_LEVEL_TIME, - ARG_SIZE_BUFFERS, - ARG_SIZE_BYTES, - ARG_SIZE_TIME, ARG_LEAKY, ARG_LEVEL, ARG_MAX_LEVEL, ARG_MAY_DEADLOCK, + ARG_MAX_WAIT, }; +/* FIXME: I don't want to copy from glib */ +struct _GAsyncQueue +{ + GMutex *mutex; + GCond *cond; + GQueue *queue; + guint waiting_threads; + guint ref_count; +}; + +static gpointer +g_async_queue_peek_unlocked (GAsyncQueue *queue) +{ + return g_queue_peek_tail (queue->queue); +} +/* static gpointer +g_async_queue_peek (GAsyncQueue *queue) +{ + gpointer retval; + + g_mutex_lock (queue->mutex); + retval = g_async_queue_peek_unlocked (queue); + g_mutex_unlock (queue->mutex); + + return retval; +}*/ static void gst_queue_class_init (GstQueueClass *klass); static void gst_queue_init (GstQueue *queue); @@ -79,12 +100,13 @@ static void gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec); static void gst_queue_chain (GstPad *pad, GstData *buf); -static GstBuffer * gst_queue_get (GstPad *pad); +static GstData * gst_queue_get (GstPad *pad); static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad); +static gpointer gst_queue_upstream_event (GstPad *pad, GstData *event); static void gst_queue_locked_flush (GstQueue *queue); -/*static void gst_queue_flush (GstQueue *queue); -*/ +static void gst_queue_flush (GstQueue *queue); + static GstElementStateReturn gst_queue_change_state (GstElement *element); @@ -148,11 +170,14 @@ gst_queue_class_init (GstQueueClass *klass) g_param_spec_int ("level", "Level", "How many buffers are in the queue.", 0, G_MAXINT, 0, G_PARAM_READABLE)); g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_LEVEL, - g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.", - 0, G_MAXINT, 100, G_PARAM_READWRITE)); + g_param_spec_int ("max-level", "Maximum Level", "How many buffers the queue holds.", + 1, G_MAXINT, 100, G_PARAM_READWRITE)); g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAY_DEADLOCK, - g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING", + g_param_spec_boolean ("may-deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING", TRUE, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_WAIT, + g_param_spec_ulong ("max-wait", "Max Wait", "How long the queue will wait", + 1, 10 * 1000 * 1000, 10 * 1000, G_PARAM_READWRITE)); gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose); gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property); @@ -208,23 +233,22 @@ gst_queue_init (GstQueue *queue) gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad); gst_pad_set_connect_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_connect)); gst_pad_set_getcaps_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps)); - + gst_pad_set_event_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_upstream_event)); + queue->leaky = GST_QUEUE_NO_LEAK; - queue->queue = NULL; - queue->level_buffers = 0; - queue->level_bytes = 0; - queue->level_time = 0LL; - queue->size_buffers = 100; /* 100 buffers */ - queue->size_bytes = 100 * 1024; /* 100KB */ - queue->size_time = 1000000000LL; /* 1sec */ + queue->queue = g_async_queue_new (); + queue->not_full = g_cond_new (); + queue->need_src = GST_QUEUE_NEED_NOTHING; + queue->need_sink = GST_QUEUE_NEED_NOTHING; + + queue->size = 100; /* 100 buffers */ queue->may_deadlock = TRUE; + queue->max_wait = 10 * 1000; /* wait max 10 milliseconds */ - queue->qlock = g_mutex_new (); - queue->reader = FALSE; - queue->writer = FALSE; - queue->not_empty = g_cond_new (); - queue->not_full = g_cond_new (); - GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n"); + queue->upstream_event = NULL; + queue->upstream_return = NULL; + queue->upstream_mutex = g_mutex_new(); + queue->upstream_cond = g_cond_new(); } static void @@ -232,9 +256,14 @@ gst_queue_dispose (GObject *object) { GstQueue *queue = GST_QUEUE (object); - g_mutex_free (queue->qlock); - g_cond_free (queue->not_empty); + if (queue->upstream_event || queue->upstream_return) + g_warning ("losing event while disposing queue"); + g_cond_free (queue->upstream_cond); + g_mutex_free (queue->upstream_mutex); + g_cond_free (queue->not_full); + g_async_queue_unref (queue->queue); + G_OBJECT_CLASS (parent_class)->dispose (object); } @@ -250,39 +279,42 @@ gst_queue_get_bufferpool (GstPad *pad) } static void -gst_queue_cleanup_buffers (gpointer data, const gpointer user_data) -{ - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p\n", data); - - gst_data_unref (GST_DATA (data)); -} - -static void gst_queue_locked_flush (GstQueue *queue) { - g_list_foreach (queue->queue, gst_queue_cleanup_buffers, - (gpointer) queue); - g_list_free (queue->queue); - - queue->queue = NULL; - queue->level_buffers = 0; - queue->timeval = NULL; + GstData *data; + + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "cleaning queue\n"); + while ((data = GST_DATA (g_async_queue_try_pop_unlocked (queue->queue))) != NULL) + { + switch (GST_DATA_TYPE (data)) + { + case GST_EVENT_NEWMEDIA: + queue->need_sink |= GST_QUEUE_NEED_NEWMEDIA; + break; + case GST_EVENT_EOS: + queue->need_sink |= GST_QUEUE_NEED_EOS; + break; + default: + queue->need_sink |= GST_QUEUE_NEED_DISCONTINUOUS; + break; + } + gst_data_unref (data); + } + queue->need_sink |= queue->need_src; + queue->need_src = GST_QUEUE_NEED_NOTHING; + g_cond_signal (queue->not_full); } -/* warning: `gst_queue_flush' defined but not used static void gst_queue_flush (GstQueue *queue) { - g_mutex_lock (queue->qlock); + g_async_queue_lock (queue->queue); gst_queue_locked_flush (queue); - g_mutex_unlock (queue->qlock); + g_async_queue_unlock (queue->queue); } -*/ - static void gst_queue_chain (GstPad *pad, GstData *buf) { GstQueue *queue; - gboolean reader; g_return_if_fail (pad != NULL); g_return_if_fail (GST_IS_PAD (pad)); @@ -291,134 +323,136 @@ gst_queue_chain (GstPad *pad, GstData *buf) queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); restart: - /* we have to lock the queue since we span threads */ - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ()); - g_mutex_lock (queue->qlock); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld\n", pthread_self ()); - if (GST_IS_EVENT (buf)) { - switch (GST_DATA_TYPE (buf)) { - case GST_EVENT_FLUSH: - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n"); - gst_queue_locked_flush (queue); + /* handle events */ + if (queue->upstream_event) + { + g_print ("handling event\n"); + g_mutex_lock (queue->upstream_mutex); + GstPad *peer = GST_PAD_PEER (pad); + + if (peer) + { + queue->upstream_return = gst_pad_send_event (peer, queue->upstream_event); + } else { + gst_data_unref (queue->upstream_event); + } + queue->upstream_event = NULL; + g_cond_signal (queue->upstream_cond); + g_mutex_unlock (queue->upstream_mutex); + g_print ("done handling event\n"); + } + + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n", buf, GST_BUFFER_SIZE(buf)); + + /* if we leak on the upstream side, drop the current buffer */ + if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM && g_async_queue_length (queue->queue) >= queue->size) { + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n"); + /* check if we have to schedule an event for when the stream gets empty */ + switch (GST_DATA_TYPE (buf)) + { + case GST_EVENT_NEWMEDIA: + g_async_queue_lock (queue->queue); + queue->need_sink |= GST_QUEUE_NEED_NEWMEDIA; + g_async_queue_unlock (queue->queue); + break; + case GST_EVENT_DISCONTINUOUS: + g_async_queue_lock (queue->queue); + queue->need_sink |= GST_QUEUE_NEED_DISCONTINUOUS; + g_async_queue_unlock (queue->queue); break; case GST_EVENT_EOS: - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "eos in on %s %d\n", - GST_ELEMENT_NAME (queue), queue->level_buffers); + g_async_queue_lock (queue->queue); + queue->need_sink |= GST_QUEUE_NEED_EOS; + g_async_queue_unlock (queue->queue); break; default: - /* gst_pad_event_default (pad, GST_EVENT (buf));*/ break; } - } - - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf)); - - if (queue->level_buffers == queue->size_buffers) { - /* if this is a leaky queue... */ - if (queue->leaky) { - /* FIXME don't want to leak events! */ - /* if we leak on the upstream side, drop the current buffer */ - if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) { - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n"); - if (GST_IS_EVENT (buf)) - fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n", - GST_ELEMENT_NAME(GST_ELEMENT(queue)), - GST_DATA_TYPE(buf)); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n"); - gst_data_unref(buf); - /* now we have to clean up and exit right away */ - g_mutex_unlock (queue->qlock); - return; - } - /* otherwise we have to push a buffer off the other end */ - else { - GList *front; - GstData *leakbuf; - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n"); - front = queue->queue; - leakbuf = (GstData *)(front->data); - if (GST_IS_EVENT (leakbuf)) - fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n", - GST_ELEMENT_NAME(GST_ELEMENT(queue)), - GST_DATA_TYPE(leakbuf)); - queue->level_buffers--; - queue->level_bytes -= GST_BUFFER_SIZE(leakbuf); + gst_data_unref(buf); + /* now we have to exit right away */ + return; + } + else if (queue->leaky == GST_QUEUE_LEAK_DOWNSTREAM) + { + /* while there are too much buffers */ + while (g_async_queue_length (queue->queue) >= queue->size) + { + GstData *leakbuf; + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n"); + g_async_queue_lock (queue->queue); + leakbuf = (GstData *) g_async_queue_try_pop_unlocked (queue->queue); + if (leakbuf != NULL) + { + switch (GST_DATA_TYPE (buf)) + { + case GST_EVENT_NEWMEDIA: + queue->need_src |= GST_QUEUE_NEED_NEWMEDIA; + break; + case GST_EVENT_EOS: + queue->need_src |= GST_QUEUE_NEED_EOS; + break; + default: + queue->need_src |= GST_QUEUE_NEED_DISCONTINUOUS; + break; + } gst_data_unref (leakbuf); - queue->queue = g_list_remove_link (queue->queue, front); - g_list_free (front); } + g_async_queue_unlock (queue->queue); } - - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n", - queue->level_buffers, queue->size_buffers); - while (queue->level_buffers == queue->size_buffers) { - /* if there's a pending state change for this queue or its manager, switch */ - /* back to iterator so bottom half of state change executes */ - while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); - g_mutex_unlock (queue->qlock); - if (gst_element_interrupt (GST_ELEMENT (queue))) - return; - goto restart; - } - if (GST_STATE (queue) != GST_STATE_PLAYING) { - /* this means the other end is shut down */ - /* try to signal to resolve the error */ - if (!queue->may_deadlock) { - gst_data_unref (buf); - g_mutex_unlock (queue->qlock); - gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down"); - return; - } - else { - gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart source pad elements"); - } + } + + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n", + g_async_queue_length (queue->queue), queue->size); + /* while there isn't enough space available */ + if (g_async_queue_length (queue->queue) >= queue->size) + { + /* if there's a pending state change for this queue or its manager, switch */ + /* back to iterator so bottom half of state change executes */ + while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); + if (gst_element_interrupt (GST_ELEMENT (queue))) + return; + goto restart; + } + if (GST_STATE (queue) != GST_STATE_PLAYING) { + /* this means the other end is shut down */ + /* try to signal to resolve the error */ + if (!queue->may_deadlock) { + gst_data_unref (buf); + gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down"); + return; + } else { + gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart source pad elements"); } - - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers); - if (queue->writer) - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!\n"); - queue->writer = TRUE; - g_cond_wait (queue->not_full, queue->qlock); - queue->writer = FALSE; - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n"); } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n", - queue->level_buffers, queue->size_buffers); + + /* FIXME: we're poking in AsyncQueue internals here */ + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", g_async_queue_length (queue->queue), queue->size); + g_print ("waiting for not null\n"); + g_mutex_lock (queue->upstream_mutex); + if (queue->upstream_event == NULL) + g_cond_wait (queue->not_full, queue->upstream_mutex); + g_mutex_unlock (queue->upstream_mutex); + g_print ("not null\n"); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n"); + goto restart; } + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n", g_async_queue_length (queue->queue), queue->size); /* put the buffer on the tail of the list */ - queue->queue = g_list_append (queue->queue, buf); - queue->level_buffers++; - queue->level_bytes += GST_BUFFER_SIZE(buf); + g_async_queue_push (queue->queue, buf); GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d\n", - GST_DEBUG_PAD_NAME(pad), - queue->level_buffers, queue->size_buffers); + GST_DEBUG_PAD_NAME(pad), g_async_queue_length (queue->queue), queue->size); - /* this assertion _has_ to hold */ - /* g_assert (g_list_length (queue->queue) == queue->level_buffers); */ - - /* reader waiting on an empty queue */ - reader = queue->reader; - - g_mutex_unlock (queue->qlock); - - if (reader) - { - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty\n"); - g_cond_signal (queue->not_empty); - } } -static GstBuffer * +static GstData * gst_queue_get (GstPad *pad) { GstQueue *queue; - GstBuffer *buf = NULL; - GList *front; - gboolean writer; + GstData *data; g_assert(pad != NULL); g_assert(GST_IS_PAD(pad)); @@ -428,27 +462,58 @@ gst_queue_get (GstPad *pad) queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); restart: - /* have to lock for thread-safety */ - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ()); - g_mutex_lock (queue->qlock); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p\n", pthread_self (), queue->not_empty); + + /* make sure we're not in the middle of a state change */ + while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); + if (gst_element_interrupt (GST_ELEMENT (queue))) + return NULL; + } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers); - while (queue->level_buffers == 0) { + g_async_queue_lock (queue->queue); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", g_async_queue_length_unlocked (queue->queue), queue->size); + if (queue->need_src) + { + GstData *event; + switch (queue->need_src) + { + case GST_QUEUE_NEED_NEWMEDIA: + event = GST_DATA (gst_event_new_newmedia ()); + break; + case GST_QUEUE_NEED_EOS: + event = GST_DATA (gst_event_new_eos ()); + break; + case GST_QUEUE_NEED_DISCONTINUOUS: + event = GST_DATA (gst_event_new_discontinuous ()); + break; + default: + event = NULL; + g_assert_not_reached (); + break; + } + data = g_async_queue_peek_unlocked (queue->queue); + if (data) + { + guint i; + for (i = 0; i < GST_OFFSET_TYPES; i++) + { + event->offset[i] = data->offset[i]; + } + } + queue->need_src = GST_QUEUE_NEED_NOTHING; + g_async_queue_unlock (queue->queue); + return event; + } + if ((data = g_async_queue_pop_unlocked (queue->queue)) == NULL) + { + GTimeVal timeval; /* if there's a pending state change for this queue or its manager, switch * back to iterator so bottom half of state change executes */ - while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); - g_mutex_unlock (queue->qlock); - if (gst_element_interrupt (GST_ELEMENT (queue))) - return NULL; - goto restart; - } if (GST_STATE (queue) != GST_STATE_PLAYING) { /* this means the other end is shut down */ if (!queue->may_deadlock) { - g_mutex_unlock (queue->qlock); + g_async_queue_unlock (queue->queue); gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down"); goto restart; } @@ -457,56 +522,37 @@ restart: } } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers); - if (queue->reader) - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!\n"); - queue->reader = TRUE; - g_cond_wait (queue->not_empty, queue->qlock); - queue->reader = FALSE; + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", g_async_queue_length_unlocked (queue->queue), queue->size); + g_get_current_time (&timeval); + g_time_val_add (&timeval, queue->max_wait); + g_print ("waiting for data\n"); + data = g_async_queue_timed_pop_unlocked (queue->queue, &timeval); + g_print ("%s\n", data == NULL ? "timer goes off while waiting for data\n" : "data available\n"); + if (data == NULL) + { + g_async_queue_unlock (queue->queue); + gst_element_yield (GST_ELEMENT (data)); + goto restart; + } GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal\n"); } - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers); - - front = queue->queue; - buf = (GstBuffer *)(front->data); - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue\n", buf); - queue->queue = g_list_remove_link (queue->queue, front); - g_list_free (front); - - queue->level_buffers--; - queue->level_bytes -= GST_BUFFER_SIZE(buf); - + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", g_async_queue_length_unlocked (queue->queue), queue->size); + g_async_queue_unlock (queue->queue); + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d\n", GST_DEBUG_PAD_NAME(pad), - queue->level_buffers, queue->size_buffers); - - /* this assertion _has_ to hold */ - /* g_assert (g_list_length (queue->queue) == queue->level_buffers); */ + g_async_queue_length (queue->queue), queue->size); - /* writer waiting on a full queue */ - writer = queue->writer; - - g_mutex_unlock (queue->qlock); - - if (writer) - { - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full\n"); - g_cond_signal (queue->not_full); - } + g_cond_signal (queue->not_full); /* FIXME where should this be? locked? */ - if (GST_IS_EVENT(buf)) { - switch (GST_DATA_TYPE(buf)) { - case GST_EVENT_EOS: - GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos\n", GST_ELEMENT_NAME (queue)); - gst_element_set_eos (GST_ELEMENT (queue)); - break; - default: - break; - } + if (GST_DATA_TYPE(data) == GST_EVENT_EOS) + { + GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos\n", GST_ELEMENT_NAME (queue)); + gst_element_set_eos (GST_ELEMENT (queue)); } - return buf; + return data; } static GstElementStateReturn @@ -524,13 +570,12 @@ gst_queue_change_state (GstElement *element) /* lock the queue so another thread (not in sync with this thread's state) * can't call this queue's _get (or whatever) */ - g_mutex_lock (queue->qlock); + g_async_queue_lock (queue->queue); new_state = GST_STATE_PENDING (element); if (new_state == GST_STATE_PAUSED) { - //g_cond_signal (queue->not_full); - //g_cond_signal (queue->not_empty); + /* g_cond_signal (queue->not_full); */ } else if (new_state == GST_STATE_READY) { gst_queue_locked_flush (queue); @@ -538,16 +583,14 @@ gst_queue_change_state (GstElement *element) else if (new_state == GST_STATE_PLAYING) { if (!GST_PAD_IS_CONNECTED (queue->sinkpad)) { /* FIXME can this be? */ - if (queue->reader) - g_cond_signal (queue->not_empty); - g_mutex_unlock (queue->qlock); + g_async_queue_unlock (queue->queue); return GST_STATE_FAILURE; } } ret = GST_ELEMENT_CLASS (parent_class)->change_state (element); - g_mutex_unlock (queue->qlock); + g_async_queue_unlock (queue->queue); GST_DEBUG_LEAVE("('%s')", GST_ELEMENT_NAME (element)); return ret; @@ -567,12 +610,19 @@ gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GPa switch (prop_id) { case ARG_LEAKY: queue->leaky = g_value_get_enum (value); + g_object_notify (object, "leaky"); break; case ARG_MAX_LEVEL: - queue->size_buffers = g_value_get_int (value); + queue->size = g_value_get_int (value); + g_object_notify (object, "max-level"); break; case ARG_MAY_DEADLOCK: queue->may_deadlock = g_value_get_boolean (value); + g_object_notify (object, "may-deadlock"); + break; + case ARG_MAX_WAIT: + queue->max_wait = g_value_get_ulong (value); + g_object_notify (object, "max-wait"); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -595,16 +645,67 @@ gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpe g_value_set_enum (value, queue->leaky); break; case ARG_LEVEL: - g_value_set_int (value, queue->level_buffers); + g_value_set_int (value, g_async_queue_length (queue->queue)); break; case ARG_MAX_LEVEL: - g_value_set_int (value, queue->size_buffers); + g_value_set_int (value, queue->size); break; case ARG_MAY_DEADLOCK: g_value_set_boolean (value, queue->may_deadlock); break; + case ARG_MAX_WAIT: + g_value_set_ulong (value, queue->max_wait); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); break; } } +static gpointer +gst_queue_upstream_event (GstPad *pad, GstData *event) +{ + GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad)); + gpointer *ret = NULL; + + /* check that everything is ok. */ + if (queue->upstream_event || queue->upstream_return) + { + g_warning ("unhandled event in queue."); + queue->upstream_return = NULL; + } + + /* transfer event */ + g_mutex_lock (queue->upstream_mutex); + gst_data_ref (event); + queue->upstream_event = event; + g_cond_signal (queue->not_full); + g_print ("waiting for handling of upstream event\n"); + while (queue->upstream_event != NULL) + g_cond_wait (queue->upstream_cond, queue->upstream_mutex); + g_print ("upstream event handled\n"); + ret = queue->upstream_return; + queue->upstream_return = NULL; + g_mutex_unlock (queue->upstream_mutex); + + /* handle event */ + if (ret != NULL) + { + switch (GST_DATA_TYPE (event)) + { + case GST_EVENT_SEEK: + if (GST_EVENT_SEEK_FLUSH(event)) + gst_queue_flush (queue); + break; + case GST_EVENT_FLUSH: + gst_queue_flush (queue); + break; + default: + break; + } + } + + /* return */ + gst_data_unref (event); + return ret; +} + diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index 06485edbc1..48be10adf4 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -53,6 +53,13 @@ enum { GST_QUEUE_LEAK_DOWNSTREAM = 2 }; +enum { + GST_QUEUE_NEED_NOTHING = 0x0, + GST_QUEUE_NEED_DISCONTINUOUS = 0x1, + GST_QUEUE_NEED_EOS = 0x3, + GST_QUEUE_NEED_NEWMEDIA = 0x7, +}; + typedef struct _GstQueue GstQueue; typedef struct _GstQueueClass GstQueueClass; @@ -63,27 +70,21 @@ struct _GstQueue { GstPad *srcpad; /* the queue of buffers we're keeping our grubby hands on */ - GList *queue; - - guint level_buffers; /* number of buffers queued here */ - guint level_bytes; /* number of bytes queued here */ - guint64 level_time; /* amount of time queued here */ - - guint size_buffers; /* size of queue in buffers */ - guint size_bytes; /* size of queue in bytes */ - guint64 size_time; /* size of queue in time */ + GAsyncQueue *queue; + GCond *not_full; /* signals space now available for writing */ + /* lock queue before changing next two values */ + guint8 need_src; /* we need this event before pushing the next buffer */ + guint8 need_sink; /* we need this event before popping the next buffer */ + gint size; /* size of queue in buffers */ gint leaky; /* whether the queue is leaky, and if so at which end */ gboolean may_deadlock; /* it the queue should fail on possible deadlocks */ - - GMutex *qlock; /* lock for queue (vs object lock) */ - /* we are single reader and single writer queue */ - gboolean reader; /* reader waiting on empty queue */ - gboolean writer; /* writer waiting on full queue */ - GCond *not_empty; /* signals buffers now available for reading */ - GCond *not_full; /* signals space now available for writing */ - - GTimeVal *timeval; /* the timeout for the queue locking */ + gulong max_wait; /* the timeout for locking in microseconds */ + + GstData *upstream_event; + gpointer upstream_return; + GMutex *upstream_mutex; + GCond *upstream_cond; }; struct _GstQueueClass { |