summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Otte <otte@gnome.org>2002-03-17 00:50:42 +0000
committerBenjamin Otte <otte@gnome.org>2002-03-17 00:50:42 +0000
commit0aa86db75c7c0894432caa17971bd629506d1346 (patch)
tree35042b293dc601b4f7aed3b45ac044b9cee10632
parentdf68257f9499fdaafbc57a0a09585a66458182e9 (diff)
downloadgstreamer-0aa86db75c7c0894432caa17971bd629506d1346.tar.gz
- rewrite of GstQueue, hope it works now, though I'm not absolutely sure there couldn'T be a deadlock
Original commit message from CVS: - rewrite of GstQueue, hope it works now, though I'm not absolutely sure there couldn'T be a deadlock - fixes to GstThread (this thing has to work without calling queues\!) - changes to scheduler (use GAsyncQueue for events)
-rw-r--r--gst/gstdata.c2
-rw-r--r--gst/gstdata.h4
-rw-r--r--gst/gstevent.c4
-rw-r--r--gst/gstevent.h1
-rw-r--r--gst/gstqueue.c539
-rw-r--r--gst/gstqueue.h37
-rw-r--r--gst/gstthread.c7
-rw-r--r--gst/schedulers/gstbasicscheduler.c45
-rw-r--r--plugins/elements/gstqueue.c539
-rw-r--r--plugins/elements/gstqueue.h37
10 files changed, 706 insertions, 509 deletions
diff --git a/gst/gstdata.c b/gst/gstdata.c
index b36c3faa94..9fb6aefde3 100644
--- a/gst/gstdata.c
+++ b/gst/gstdata.c
@@ -22,7 +22,7 @@ gst_data_init (GstData *data)
data->flags = 0;
for (i = 0; i < GST_OFFSET_TYPES; i++)
{
- data->offset[i] = 0;
+ data->offset[i] = GST_OFFSET_INVALID;
}
}
void
diff --git a/gst/gstdata.h b/gst/gstdata.h
index 6c922c9384..759d5186cd 100644
--- a/gst/gstdata.h
+++ b/gst/gstdata.h
@@ -65,7 +65,9 @@ typedef enum {
} GstDataType;
/* number of types */
-#define GST_OFFSET_TYPES 3
+#define GST_OFFSET_TYPES 3 /* number of different GstOffsetType types */
+#define GST_OFFSET_INVALID (~((guint64) 0)) /* invalid (or unitialized) offset */
+#define GST_OFFSET_IS_INVLAID(ofset) (offset == GST_OFFSET_INVALID)
typedef enum {
GST_OFFSET_TIME = 0,
GST_OFFSET_BYTES = 1,
diff --git a/gst/gstevent.c b/gst/gstevent.c
index 4e301c815d..231e15aab5 100644
--- a/gst/gstevent.c
+++ b/gst/gstevent.c
@@ -129,7 +129,7 @@ gst_event_seek_init (GstEventSeek *event, GstSeekType type, GstOffsetType offset
event->original = offset_type;
for (i = 0; i < GST_OFFSET_TYPES; i++)
{
- event->offset[i] = 0;
+ event->offset[i] = GST_OFFSET_INVALID;
event->accuracy[i] = GST_ACCURACY_NONE;
}
event->flush = TRUE;
@@ -275,7 +275,7 @@ gst_event_length_init (GstEventLength *event)
for (i = 0; i < GST_OFFSET_TYPES; i++)
{
event->accuracy[i] = GST_ACCURACY_NONE;
- event->length[i] = 0;
+ event->length[i] = GST_OFFSET_INVALID;
}
}
/**
diff --git a/gst/gstevent.h b/gst/gstevent.h
index febcd8a153..eaf92ab3dc 100644
--- a/gst/gstevent.h
+++ b/gst/gstevent.h
@@ -58,6 +58,7 @@ typedef enum {
#define GST_EVENT_UNLOCK(event) ((GstEventUnLock *) (event))
#define GST_EVENT_SEEK_TYPE(event) (GST_EVENT_SEEK(event)->type)
+#define GST_EVENT_SEEK_FLUSH(event) (GST_EVENT_SEEK(event)->flush)
typedef struct _GstEventEOS GstEventEOS;
typedef struct _GstEventDiscontinuous GstEventDiscontinuous;
diff --git a/gst/gstqueue.c b/gst/gstqueue.c
index c60803a110..9243490b44 100644
--- a/gst/gstqueue.c
+++ b/gst/gstqueue.c
@@ -56,18 +56,39 @@ enum {
enum {
ARG_0,
- ARG_LEVEL_BUFFERS,
- ARG_LEVEL_BYTES,
- ARG_LEVEL_TIME,
- ARG_SIZE_BUFFERS,
- ARG_SIZE_BYTES,
- ARG_SIZE_TIME,
ARG_LEAKY,
ARG_LEVEL,
ARG_MAX_LEVEL,
ARG_MAY_DEADLOCK,
+ ARG_MAX_WAIT,
};
+/* FIXME: I don't want to copy from glib */
+struct _GAsyncQueue
+{
+ GMutex *mutex;
+ GCond *cond;
+ GQueue *queue;
+ guint waiting_threads;
+ guint ref_count;
+};
+
+static gpointer
+g_async_queue_peek_unlocked (GAsyncQueue *queue)
+{
+ return g_queue_peek_tail (queue->queue);
+}
+/* static gpointer
+g_async_queue_peek (GAsyncQueue *queue)
+{
+ gpointer retval;
+
+ g_mutex_lock (queue->mutex);
+ retval = g_async_queue_peek_unlocked (queue);
+ g_mutex_unlock (queue->mutex);
+
+ return retval;
+}*/
static void gst_queue_class_init (GstQueueClass *klass);
static void gst_queue_init (GstQueue *queue);
@@ -79,12 +100,13 @@ static void gst_queue_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec);
static void gst_queue_chain (GstPad *pad, GstData *buf);
-static GstBuffer * gst_queue_get (GstPad *pad);
+static GstData * gst_queue_get (GstPad *pad);
static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad);
+static gpointer gst_queue_upstream_event (GstPad *pad, GstData *event);
static void gst_queue_locked_flush (GstQueue *queue);
-/*static void gst_queue_flush (GstQueue *queue);
-*/
+static void gst_queue_flush (GstQueue *queue);
+
static GstElementStateReturn gst_queue_change_state (GstElement *element);
@@ -148,11 +170,14 @@ gst_queue_class_init (GstQueueClass *klass)
g_param_spec_int ("level", "Level", "How many buffers are in the queue.",
0, G_MAXINT, 0, G_PARAM_READABLE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_LEVEL,
- g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.",
- 0, G_MAXINT, 100, G_PARAM_READWRITE));
+ g_param_spec_int ("max-level", "Maximum Level", "How many buffers the queue holds.",
+ 1, G_MAXINT, 100, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAY_DEADLOCK,
- g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING",
+ g_param_spec_boolean ("may-deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING",
TRUE, G_PARAM_READWRITE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_WAIT,
+ g_param_spec_ulong ("max-wait", "Max Wait", "How long the queue will wait",
+ 1, 10 * 1000 * 1000, 10 * 1000, G_PARAM_READWRITE));
gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose);
gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
@@ -208,23 +233,22 @@ gst_queue_init (GstQueue *queue)
gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
gst_pad_set_connect_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_connect));
gst_pad_set_getcaps_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps));
-
+ gst_pad_set_event_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_upstream_event));
+
queue->leaky = GST_QUEUE_NO_LEAK;
- queue->queue = NULL;
- queue->level_buffers = 0;
- queue->level_bytes = 0;
- queue->level_time = 0LL;
- queue->size_buffers = 100; /* 100 buffers */
- queue->size_bytes = 100 * 1024; /* 100KB */
- queue->size_time = 1000000000LL; /* 1sec */
+ queue->queue = g_async_queue_new ();
+ queue->not_full = g_cond_new ();
+ queue->need_src = GST_QUEUE_NEED_NOTHING;
+ queue->need_sink = GST_QUEUE_NEED_NOTHING;
+
+ queue->size = 100; /* 100 buffers */
queue->may_deadlock = TRUE;
+ queue->max_wait = 10 * 1000; /* wait max 10 milliseconds */
- queue->qlock = g_mutex_new ();
- queue->reader = FALSE;
- queue->writer = FALSE;
- queue->not_empty = g_cond_new ();
- queue->not_full = g_cond_new ();
- GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n");
+ queue->upstream_event = NULL;
+ queue->upstream_return = NULL;
+ queue->upstream_mutex = g_mutex_new();
+ queue->upstream_cond = g_cond_new();
}
static void
@@ -232,9 +256,14 @@ gst_queue_dispose (GObject *object)
{
GstQueue *queue = GST_QUEUE (object);
- g_mutex_free (queue->qlock);
- g_cond_free (queue->not_empty);
+ if (queue->upstream_event || queue->upstream_return)
+ g_warning ("losing event while disposing queue");
+ g_cond_free (queue->upstream_cond);
+ g_mutex_free (queue->upstream_mutex);
+
g_cond_free (queue->not_full);
+ g_async_queue_unref (queue->queue);
+
G_OBJECT_CLASS (parent_class)->dispose (object);
}
@@ -250,39 +279,42 @@ gst_queue_get_bufferpool (GstPad *pad)
}
static void
-gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
-{
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p\n", data);
-
- gst_data_unref (GST_DATA (data));
-}
-
-static void
gst_queue_locked_flush (GstQueue *queue)
{
- g_list_foreach (queue->queue, gst_queue_cleanup_buffers,
- (gpointer) queue);
- g_list_free (queue->queue);
-
- queue->queue = NULL;
- queue->level_buffers = 0;
- queue->timeval = NULL;
+ GstData *data;
+
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "cleaning queue\n");
+ while ((data = GST_DATA (g_async_queue_try_pop_unlocked (queue->queue))) != NULL)
+ {
+ switch (GST_DATA_TYPE (data))
+ {
+ case GST_EVENT_NEWMEDIA:
+ queue->need_sink |= GST_QUEUE_NEED_NEWMEDIA;
+ break;
+ case GST_EVENT_EOS:
+ queue->need_sink |= GST_QUEUE_NEED_EOS;
+ break;
+ default:
+ queue->need_sink |= GST_QUEUE_NEED_DISCONTINUOUS;
+ break;
+ }
+ gst_data_unref (data);
+ }
+ queue->need_sink |= queue->need_src;
+ queue->need_src = GST_QUEUE_NEED_NOTHING;
+ g_cond_signal (queue->not_full);
}
-/* warning: `gst_queue_flush' defined but not used
static void
gst_queue_flush (GstQueue *queue)
{
- g_mutex_lock (queue->qlock);
+ g_async_queue_lock (queue->queue);
gst_queue_locked_flush (queue);
- g_mutex_unlock (queue->qlock);
+ g_async_queue_unlock (queue->queue);
}
-*/
-
static void
gst_queue_chain (GstPad *pad, GstData *buf)
{
GstQueue *queue;
- gboolean reader;
g_return_if_fail (pad != NULL);
g_return_if_fail (GST_IS_PAD (pad));
@@ -291,134 +323,136 @@ gst_queue_chain (GstPad *pad, GstData *buf)
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
restart:
- /* we have to lock the queue since we span threads */
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
- g_mutex_lock (queue->qlock);
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld\n", pthread_self ());
- if (GST_IS_EVENT (buf)) {
- switch (GST_DATA_TYPE (buf)) {
- case GST_EVENT_FLUSH:
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n");
- gst_queue_locked_flush (queue);
+ /* handle events */
+ if (queue->upstream_event)
+ {
+ g_print ("handling event\n");
+ g_mutex_lock (queue->upstream_mutex);
+ GstPad *peer = GST_PAD_PEER (pad);
+
+ if (peer)
+ {
+ queue->upstream_return = gst_pad_send_event (peer, queue->upstream_event);
+ } else {
+ gst_data_unref (queue->upstream_event);
+ }
+ queue->upstream_event = NULL;
+ g_cond_signal (queue->upstream_cond);
+ g_mutex_unlock (queue->upstream_mutex);
+ g_print ("done handling event\n");
+ }
+
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n", buf, GST_BUFFER_SIZE(buf));
+
+ /* if we leak on the upstream side, drop the current buffer */
+ if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM && g_async_queue_length (queue->queue) >= queue->size) {
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
+ /* check if we have to schedule an event for when the stream gets empty */
+ switch (GST_DATA_TYPE (buf))
+ {
+ case GST_EVENT_NEWMEDIA:
+ g_async_queue_lock (queue->queue);
+ queue->need_sink |= GST_QUEUE_NEED_NEWMEDIA;
+ g_async_queue_unlock (queue->queue);
+ break;
+ case GST_EVENT_DISCONTINUOUS:
+ g_async_queue_lock (queue->queue);
+ queue->need_sink |= GST_QUEUE_NEED_DISCONTINUOUS;
+ g_async_queue_unlock (queue->queue);
break;
case GST_EVENT_EOS:
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "eos in on %s %d\n",
- GST_ELEMENT_NAME (queue), queue->level_buffers);
+ g_async_queue_lock (queue->queue);
+ queue->need_sink |= GST_QUEUE_NEED_EOS;
+ g_async_queue_unlock (queue->queue);
break;
default:
- /* gst_pad_event_default (pad, GST_EVENT (buf));*/
break;
}
- }
-
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf));
-
- if (queue->level_buffers == queue->size_buffers) {
- /* if this is a leaky queue... */
- if (queue->leaky) {
- /* FIXME don't want to leak events! */
- /* if we leak on the upstream side, drop the current buffer */
- if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
- if (GST_IS_EVENT (buf))
- fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
- GST_ELEMENT_NAME(GST_ELEMENT(queue)),
- GST_DATA_TYPE(buf));
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
- gst_data_unref(buf);
- /* now we have to clean up and exit right away */
- g_mutex_unlock (queue->qlock);
- return;
- }
- /* otherwise we have to push a buffer off the other end */
- else {
- GList *front;
- GstData *leakbuf;
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n");
- front = queue->queue;
- leakbuf = (GstData *)(front->data);
- if (GST_IS_EVENT (leakbuf))
- fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
- GST_ELEMENT_NAME(GST_ELEMENT(queue)),
- GST_DATA_TYPE(leakbuf));
- queue->level_buffers--;
- queue->level_bytes -= GST_BUFFER_SIZE(leakbuf);
+ gst_data_unref(buf);
+ /* now we have to exit right away */
+ return;
+ }
+ else if (queue->leaky == GST_QUEUE_LEAK_DOWNSTREAM)
+ {
+ /* while there are too much buffers */
+ while (g_async_queue_length (queue->queue) >= queue->size)
+ {
+ GstData *leakbuf;
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n");
+ g_async_queue_lock (queue->queue);
+ leakbuf = (GstData *) g_async_queue_try_pop_unlocked (queue->queue);
+ if (leakbuf != NULL)
+ {
+ switch (GST_DATA_TYPE (buf))
+ {
+ case GST_EVENT_NEWMEDIA:
+ queue->need_src |= GST_QUEUE_NEED_NEWMEDIA;
+ break;
+ case GST_EVENT_EOS:
+ queue->need_src |= GST_QUEUE_NEED_EOS;
+ break;
+ default:
+ queue->need_src |= GST_QUEUE_NEED_DISCONTINUOUS;
+ break;
+ }
gst_data_unref (leakbuf);
- queue->queue = g_list_remove_link (queue->queue, front);
- g_list_free (front);
}
+ g_async_queue_unlock (queue->queue);
}
-
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n",
- queue->level_buffers, queue->size_buffers);
- while (queue->level_buffers == queue->size_buffers) {
- /* if there's a pending state change for this queue or its manager, switch */
- /* back to iterator so bottom half of state change executes */
- while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
- g_mutex_unlock (queue->qlock);
- if (gst_element_interrupt (GST_ELEMENT (queue)))
- return;
- goto restart;
- }
- if (GST_STATE (queue) != GST_STATE_PLAYING) {
- /* this means the other end is shut down */
- /* try to signal to resolve the error */
- if (!queue->may_deadlock) {
- gst_data_unref (buf);
- g_mutex_unlock (queue->qlock);
- gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
- return;
- }
- else {
- gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart source pad elements");
- }
+ }
+
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n",
+ g_async_queue_length (queue->queue), queue->size);
+ /* while there isn't enough space available */
+ if (g_async_queue_length (queue->queue) >= queue->size)
+ {
+ /* if there's a pending state change for this queue or its manager, switch */
+ /* back to iterator so bottom half of state change executes */
+ while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
+ if (gst_element_interrupt (GST_ELEMENT (queue)))
+ return;
+ goto restart;
+ }
+ if (GST_STATE (queue) != GST_STATE_PLAYING) {
+ /* this means the other end is shut down */
+ /* try to signal to resolve the error */
+ if (!queue->may_deadlock) {
+ gst_data_unref (buf);
+ gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
+ return;
+ } else {
+ gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart source pad elements");
}
-
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
- if (queue->writer)
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!\n");
- queue->writer = TRUE;
- g_cond_wait (queue->not_full, queue->qlock);
- queue->writer = FALSE;
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n");
}
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n",
- queue->level_buffers, queue->size_buffers);
+
+ /* FIXME: we're poking in AsyncQueue internals here */
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", g_async_queue_length (queue->queue), queue->size);
+ g_print ("waiting for not null\n");
+ g_mutex_lock (queue->upstream_mutex);
+ if (queue->upstream_event == NULL)
+ g_cond_wait (queue->not_full, queue->upstream_mutex);
+ g_mutex_unlock (queue->upstream_mutex);
+ g_print ("not null\n");
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n");
+ goto restart;
}
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n", g_async_queue_length (queue->queue), queue->size);
/* put the buffer on the tail of the list */
- queue->queue = g_list_append (queue->queue, buf);
- queue->level_buffers++;
- queue->level_bytes += GST_BUFFER_SIZE(buf);
+ g_async_queue_push (queue->queue, buf);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d\n",
- GST_DEBUG_PAD_NAME(pad),
- queue->level_buffers, queue->size_buffers);
+ GST_DEBUG_PAD_NAME(pad), g_async_queue_length (queue->queue), queue->size);
- /* this assertion _has_ to hold */
- /* g_assert (g_list_length (queue->queue) == queue->level_buffers); */
-
- /* reader waiting on an empty queue */
- reader = queue->reader;
-
- g_mutex_unlock (queue->qlock);
-
- if (reader)
- {
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty\n");
- g_cond_signal (queue->not_empty);
- }
}
-static GstBuffer *
+static GstData *
gst_queue_get (GstPad *pad)
{
GstQueue *queue;
- GstBuffer *buf = NULL;
- GList *front;
- gboolean writer;
+ GstData *data;
g_assert(pad != NULL);
g_assert(GST_IS_PAD(pad));
@@ -428,27 +462,58 @@ gst_queue_get (GstPad *pad)
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
restart:
- /* have to lock for thread-safety */
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
- g_mutex_lock (queue->qlock);
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p\n", pthread_self (), queue->not_empty);
+
+ /* make sure we're not in the middle of a state change */
+ while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
+ if (gst_element_interrupt (GST_ELEMENT (queue)))
+ return NULL;
+ }
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
- while (queue->level_buffers == 0) {
+ g_async_queue_lock (queue->queue);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", g_async_queue_length_unlocked (queue->queue), queue->size);
+ if (queue->need_src)
+ {
+ GstData *event;
+ switch (queue->need_src)
+ {
+ case GST_QUEUE_NEED_NEWMEDIA:
+ event = GST_DATA (gst_event_new_newmedia ());
+ break;
+ case GST_QUEUE_NEED_EOS:
+ event = GST_DATA (gst_event_new_eos ());
+ break;
+ case GST_QUEUE_NEED_DISCONTINUOUS:
+ event = GST_DATA (gst_event_new_discontinuous ());
+ break;
+ default:
+ event = NULL;
+ g_assert_not_reached ();
+ break;
+ }
+ data = g_async_queue_peek_unlocked (queue->queue);
+ if (data)
+ {
+ guint i;
+ for (i = 0; i < GST_OFFSET_TYPES; i++)
+ {
+ event->offset[i] = data->offset[i];
+ }
+ }
+ queue->need_src = GST_QUEUE_NEED_NOTHING;
+ g_async_queue_unlock (queue->queue);
+ return event;
+ }
+ if ((data = g_async_queue_pop_unlocked (queue->queue)) == NULL)
+ {
+ GTimeVal timeval;
/* if there's a pending state change for this queue or its manager, switch
* back to iterator so bottom half of state change executes
*/
- while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
- g_mutex_unlock (queue->qlock);
- if (gst_element_interrupt (GST_ELEMENT (queue)))
- return NULL;
- goto restart;
- }
if (GST_STATE (queue) != GST_STATE_PLAYING) {
/* this means the other end is shut down */
if (!queue->may_deadlock) {
- g_mutex_unlock (queue->qlock);
+ g_async_queue_unlock (queue->queue);
gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down");
goto restart;
}
@@ -457,56 +522,37 @@ restart:
}
}
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
- if (queue->reader)
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!\n");
- queue->reader = TRUE;
- g_cond_wait (queue->not_empty, queue->qlock);
- queue->reader = FALSE;
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", g_async_queue_length_unlocked (queue->queue), queue->size);
+ g_get_current_time (&timeval);
+ g_time_val_add (&timeval, queue->max_wait);
+ g_print ("waiting for data\n");
+ data = g_async_queue_timed_pop_unlocked (queue->queue, &timeval);
+ g_print ("%s\n", data == NULL ? "timer goes off while waiting for data\n" : "data available\n");
+ if (data == NULL)
+ {
+ g_async_queue_unlock (queue->queue);
+ gst_element_yield (GST_ELEMENT (data));
+ goto restart;
+ }
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal\n");
}
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
-
- front = queue->queue;
- buf = (GstBuffer *)(front->data);
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue\n", buf);
- queue->queue = g_list_remove_link (queue->queue, front);
- g_list_free (front);
-
- queue->level_buffers--;
- queue->level_bytes -= GST_BUFFER_SIZE(buf);
-
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", g_async_queue_length_unlocked (queue->queue), queue->size);
+ g_async_queue_unlock (queue->queue);
+
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d\n",
GST_DEBUG_PAD_NAME(pad),
- queue->level_buffers, queue->size_buffers);
-
- /* this assertion _has_ to hold */
- /* g_assert (g_list_length (queue->queue) == queue->level_buffers); */
+ g_async_queue_length (queue->queue), queue->size);
- /* writer waiting on a full queue */
- writer = queue->writer;
-
- g_mutex_unlock (queue->qlock);
-
- if (writer)
- {
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full\n");
- g_cond_signal (queue->not_full);
- }
+ g_cond_signal (queue->not_full);
/* FIXME where should this be? locked? */
- if (GST_IS_EVENT(buf)) {
- switch (GST_DATA_TYPE(buf)) {
- case GST_EVENT_EOS:
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos\n", GST_ELEMENT_NAME (queue));
- gst_element_set_eos (GST_ELEMENT (queue));
- break;
- default:
- break;
- }
+ if (GST_DATA_TYPE(data) == GST_EVENT_EOS)
+ {
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos\n", GST_ELEMENT_NAME (queue));
+ gst_element_set_eos (GST_ELEMENT (queue));
}
- return buf;
+ return data;
}
static GstElementStateReturn
@@ -524,13 +570,12 @@ gst_queue_change_state (GstElement *element)
/* lock the queue so another thread (not in sync with this thread's state)
* can't call this queue's _get (or whatever)
*/
- g_mutex_lock (queue->qlock);
+ g_async_queue_lock (queue->queue);
new_state = GST_STATE_PENDING (element);
if (new_state == GST_STATE_PAUSED) {
- //g_cond_signal (queue->not_full);
- //g_cond_signal (queue->not_empty);
+ /* g_cond_signal (queue->not_full); */
}
else if (new_state == GST_STATE_READY) {
gst_queue_locked_flush (queue);
@@ -538,16 +583,14 @@ gst_queue_change_state (GstElement *element)
else if (new_state == GST_STATE_PLAYING) {
if (!GST_PAD_IS_CONNECTED (queue->sinkpad)) {
/* FIXME can this be? */
- if (queue->reader)
- g_cond_signal (queue->not_empty);
- g_mutex_unlock (queue->qlock);
+ g_async_queue_unlock (queue->queue);
return GST_STATE_FAILURE;
}
}
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
- g_mutex_unlock (queue->qlock);
+ g_async_queue_unlock (queue->queue);
GST_DEBUG_LEAVE("('%s')", GST_ELEMENT_NAME (element));
return ret;
@@ -567,12 +610,19 @@ gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GPa
switch (prop_id) {
case ARG_LEAKY:
queue->leaky = g_value_get_enum (value);
+ g_object_notify (object, "leaky");
break;
case ARG_MAX_LEVEL:
- queue->size_buffers = g_value_get_int (value);
+ queue->size = g_value_get_int (value);
+ g_object_notify (object, "max-level");
break;
case ARG_MAY_DEADLOCK:
queue->may_deadlock = g_value_get_boolean (value);
+ g_object_notify (object, "may-deadlock");
+ break;
+ case ARG_MAX_WAIT:
+ queue->max_wait = g_value_get_ulong (value);
+ g_object_notify (object, "max-wait");
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -595,16 +645,67 @@ gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpe
g_value_set_enum (value, queue->leaky);
break;
case ARG_LEVEL:
- g_value_set_int (value, queue->level_buffers);
+ g_value_set_int (value, g_async_queue_length (queue->queue));
break;
case ARG_MAX_LEVEL:
- g_value_set_int (value, queue->size_buffers);
+ g_value_set_int (value, queue->size);
break;
case ARG_MAY_DEADLOCK:
g_value_set_boolean (value, queue->may_deadlock);
break;
+ case ARG_MAX_WAIT:
+ g_value_set_ulong (value, queue->max_wait);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
+static gpointer
+gst_queue_upstream_event (GstPad *pad, GstData *event)
+{
+ GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad));
+ gpointer *ret = NULL;
+
+ /* check that everything is ok. */
+ if (queue->upstream_event || queue->upstream_return)
+ {
+ g_warning ("unhandled event in queue.");
+ queue->upstream_return = NULL;
+ }
+
+ /* transfer event */
+ g_mutex_lock (queue->upstream_mutex);
+ gst_data_ref (event);
+ queue->upstream_event = event;
+ g_cond_signal (queue->not_full);
+ g_print ("waiting for handling of upstream event\n");
+ while (queue->upstream_event != NULL)
+ g_cond_wait (queue->upstream_cond, queue->upstream_mutex);
+ g_print ("upstream event handled\n");
+ ret = queue->upstream_return;
+ queue->upstream_return = NULL;
+ g_mutex_unlock (queue->upstream_mutex);
+
+ /* handle event */
+ if (ret != NULL)
+ {
+ switch (GST_DATA_TYPE (event))
+ {
+ case GST_EVENT_SEEK:
+ if (GST_EVENT_SEEK_FLUSH(event))
+ gst_queue_flush (queue);
+ break;
+ case GST_EVENT_FLUSH:
+ gst_queue_flush (queue);
+ break;
+ default:
+ break;
+ }
+ }
+
+ /* return */
+ gst_data_unref (event);
+ return ret;
+}
+
diff --git a/gst/gstqueue.h b/gst/gstqueue.h
index 06485edbc1..48be10adf4 100644
--- a/gst/gstqueue.h
+++ b/gst/gstqueue.h
@@ -53,6 +53,13 @@ enum {
GST_QUEUE_LEAK_DOWNSTREAM = 2
};
+enum {
+ GST_QUEUE_NEED_NOTHING = 0x0,
+ GST_QUEUE_NEED_DISCONTINUOUS = 0x1,
+ GST_QUEUE_NEED_EOS = 0x3,
+ GST_QUEUE_NEED_NEWMEDIA = 0x7,
+};
+
typedef struct _GstQueue GstQueue;
typedef struct _GstQueueClass GstQueueClass;
@@ -63,27 +70,21 @@ struct _GstQueue {
GstPad *srcpad;
/* the queue of buffers we're keeping our grubby hands on */
- GList *queue;
-
- guint level_buffers; /* number of buffers queued here */
- guint level_bytes; /* number of bytes queued here */
- guint64 level_time; /* amount of time queued here */
-
- guint size_buffers; /* size of queue in buffers */
- guint size_bytes; /* size of queue in bytes */
- guint64 size_time; /* size of queue in time */
+ GAsyncQueue *queue;
+ GCond *not_full; /* signals space now available for writing */
+ /* lock queue before changing next two values */
+ guint8 need_src; /* we need this event before pushing the next buffer */
+ guint8 need_sink; /* we need this event before popping the next buffer */
+ gint size; /* size of queue in buffers */
gint leaky; /* whether the queue is leaky, and if so at which end */
gboolean may_deadlock; /* it the queue should fail on possible deadlocks */
-
- GMutex *qlock; /* lock for queue (vs object lock) */
- /* we are single reader and single writer queue */
- gboolean reader; /* reader waiting on empty queue */
- gboolean writer; /* writer waiting on full queue */
- GCond *not_empty; /* signals buffers now available for reading */
- GCond *not_full; /* signals space now available for writing */
-
- GTimeVal *timeval; /* the timeout for the queue locking */
+ gulong max_wait; /* the timeout for locking in microseconds */
+
+ GstData *upstream_event;
+ gpointer upstream_return;
+ GMutex *upstream_mutex;
+ GCond *upstream_cond;
};
struct _GstQueueClass {
diff --git a/gst/gstthread.c b/gst/gstthread.c
index fb54425cd7..84b785541d 100644
--- a/gst/gstthread.c
+++ b/gst/gstthread.c
@@ -170,7 +170,6 @@ gst_thread_dispose (GObject *object)
G_OBJECT_CLASS (parent_class)->dispose (object);
if (GST_ELEMENT_SCHED (thread)) {
- gst_object_destroy (GST_OBJECT (GST_ELEMENT_SCHED (thread)));
gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (thread)));
}
}
@@ -338,11 +337,8 @@ gst_thread_change_state (GstElement * element)
* FIXME also make this more efficient by keeping list of managed queues
*/
THR_DEBUG ("waking queue \"%s\"\n", GST_ELEMENT_NAME (element));
- g_mutex_lock (queue->qlock);
GST_STATE_PENDING (element) = GST_STATE_PAUSED;
g_cond_signal (queue->not_full);
- g_cond_signal (queue->not_empty);
- g_mutex_unlock (queue->qlock);
}
else {
GList *pads = GST_ELEMENT_PADS (element);
@@ -376,10 +372,7 @@ gst_thread_change_state (GstElement * element)
THR_DEBUG (" element \"%s\" has pad cross sched boundary\n", GST_ELEMENT_NAME (element));
/* FIXME!! */
- g_mutex_lock (queue->qlock);
g_cond_signal (queue->not_full);
- g_cond_signal (queue->not_empty);
- g_mutex_unlock (queue->qlock);
}
}
}
diff --git a/gst/schedulers/gstbasicscheduler.c b/gst/schedulers/gstbasicscheduler.c
index f1286314f9..79d1274cf4 100644
--- a/gst/schedulers/gstbasicscheduler.c
+++ b/gst/schedulers/gstbasicscheduler.c
@@ -89,8 +89,8 @@ struct _GstBasicScheduler {
GList *chains;
gint num_chains;
- GMutex *event_lock;
- GList *event_queue;
+ GAsyncQueue *event_queue;
+ gboolean has_events; /* speed up knowing if events are waiting - must only be set when event_queue is locked */
GstBasicSchedulerState state;
};
@@ -124,7 +124,7 @@ static GstSchedulerState
gst_basic_scheduler_iterate (GstScheduler *sched);
static void gst_basic_scheduler_insert_event (GstScheduler *sched, GstPad *pad, GstData *event);
static void gst_basic_scheduler_handle_events (GstBasicScheduler *sched);
-#define gst_basic_scheduler_events(sched) G_STMT_START{ if ((sched) && (GST_BASIC_SCHEDULER (sched)->event_queue)) \
+#define gst_basic_scheduler_events(sched) G_STMT_START{ if ((sched) && (GST_BASIC_SCHEDULER (sched)->has_events)) \
gst_basic_scheduler_handle_events(GST_BASIC_SCHEDULER (sched)); }G_STMT_END
static void gst_basic_scheduler_show (GstScheduler *sched);
@@ -194,26 +194,24 @@ gst_basic_scheduler_init (GstBasicScheduler *scheduler)
scheduler->num_elements = 0;
scheduler->chains = NULL;
scheduler->num_chains = 0;
- scheduler->event_lock = g_mutex_new();
- scheduler->event_queue = NULL;
+ scheduler->event_queue = g_async_queue_new ();
+ scheduler->has_events = FALSE;
}
static void
gst_basic_scheduler_dispose (GObject *object)
{
+ GstBasicSchedulerEvent *ev;
GstBasicScheduler *sched = GST_BASIC_SCHEDULER (object);
- GList *list;
- g_mutex_free (sched->event_lock);
- list = sched->event_queue;
- while (list)
+ g_async_queue_lock (sched->event_queue);
+ while ((ev = (GstBasicSchedulerEvent *) g_async_queue_try_pop_unlocked (sched->event_queue)) != NULL)
{
- GstBasicSchedulerEvent *ev = (GstBasicSchedulerEvent *) list->data;
gst_data_unref (GST_DATA (ev->event));
g_free (ev);
- list = g_list_next (list);
}
- g_list_free (sched->event_queue);
+ g_async_queue_unlock (sched->event_queue);
+ g_async_queue_unref (sched->event_queue);
G_OBJECT_CLASS (parent_class)->dispose (object);
}
@@ -1051,10 +1049,7 @@ gst_basic_scheduler_remove_element (GstScheduler * sched, GstElement * element)
if (GST_ELEMENT_IS_COTHREAD_STOPPING (element)) {
GstElement *entry = GST_ELEMENT (cothread_get_private (cothread_current ()));
- if (entry == element) {
- g_warning ("removing currently running element! %s", GST_ELEMENT_NAME (entry));
- }
- else if (entry) {
+ if (entry && entry != element) {
GST_INFO (GST_CAT_SCHEDULING, "moving stopping to element \"%s\"",
GST_ELEMENT_NAME (entry));
GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING);
@@ -1385,16 +1380,17 @@ gst_basic_scheduler_handle_events (GstBasicScheduler *sched)
{
GstBasicSchedulerEvent *ev;
- while (sched->event_queue)
+ g_async_queue_lock (sched->event_queue);
+ while ((ev = (GstBasicSchedulerEvent *) g_async_queue_try_pop_unlocked (sched->event_queue)) != NULL)
{
+ g_async_queue_unlock (sched->event_queue);
GST_DEBUG (GST_CAT_DATAFLOW, "handling asynchronous event");
- g_mutex_lock (sched->event_lock);
- ev = sched->event_queue->data;
- sched->event_queue = g_list_delete_link (sched->event_queue, sched->event_queue);
- g_mutex_unlock (sched->event_lock);
gst_pad_send_event (ev->pad, ev->event);
g_free (ev);
+ g_async_queue_lock (sched->event_queue);
}
+ sched->has_events = FALSE;
+ g_async_queue_unlock (sched->event_queue);
}
static void
@@ -1404,9 +1400,10 @@ gst_basic_scheduler_insert_event (GstScheduler *scheduler, GstPad *pad, GstData
GstBasicSchedulerEvent *ev = g_new (GstBasicSchedulerEvent, 1);
ev->pad = pad;
ev->event = event;
- g_mutex_lock (sched->event_lock);
- sched->event_queue = g_list_prepend (sched->event_queue, ev);
- g_mutex_unlock (sched->event_lock);
+ g_async_queue_lock (sched->event_queue);
+ g_async_queue_push_unlocked (sched->event_queue, ev);
+ sched->has_events = TRUE;
+ g_async_queue_unlock (sched->event_queue);
}
static void
diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c
index c60803a110..9243490b44 100644
--- a/plugins/elements/gstqueue.c
+++ b/plugins/elements/gstqueue.c
@@ -56,18 +56,39 @@ enum {
enum {
ARG_0,
- ARG_LEVEL_BUFFERS,
- ARG_LEVEL_BYTES,
- ARG_LEVEL_TIME,
- ARG_SIZE_BUFFERS,
- ARG_SIZE_BYTES,
- ARG_SIZE_TIME,
ARG_LEAKY,
ARG_LEVEL,
ARG_MAX_LEVEL,
ARG_MAY_DEADLOCK,
+ ARG_MAX_WAIT,
};
+/* FIXME: I don't want to copy from glib */
+struct _GAsyncQueue
+{
+ GMutex *mutex;
+ GCond *cond;
+ GQueue *queue;
+ guint waiting_threads;
+ guint ref_count;
+};
+
+static gpointer
+g_async_queue_peek_unlocked (GAsyncQueue *queue)
+{
+ return g_queue_peek_tail (queue->queue);
+}
+/* static gpointer
+g_async_queue_peek (GAsyncQueue *queue)
+{
+ gpointer retval;
+
+ g_mutex_lock (queue->mutex);
+ retval = g_async_queue_peek_unlocked (queue);
+ g_mutex_unlock (queue->mutex);
+
+ return retval;
+}*/
static void gst_queue_class_init (GstQueueClass *klass);
static void gst_queue_init (GstQueue *queue);
@@ -79,12 +100,13 @@ static void gst_queue_get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec);
static void gst_queue_chain (GstPad *pad, GstData *buf);
-static GstBuffer * gst_queue_get (GstPad *pad);
+static GstData * gst_queue_get (GstPad *pad);
static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad);
+static gpointer gst_queue_upstream_event (GstPad *pad, GstData *event);
static void gst_queue_locked_flush (GstQueue *queue);
-/*static void gst_queue_flush (GstQueue *queue);
-*/
+static void gst_queue_flush (GstQueue *queue);
+
static GstElementStateReturn gst_queue_change_state (GstElement *element);
@@ -148,11 +170,14 @@ gst_queue_class_init (GstQueueClass *klass)
g_param_spec_int ("level", "Level", "How many buffers are in the queue.",
0, G_MAXINT, 0, G_PARAM_READABLE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_LEVEL,
- g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.",
- 0, G_MAXINT, 100, G_PARAM_READWRITE));
+ g_param_spec_int ("max-level", "Maximum Level", "How many buffers the queue holds.",
+ 1, G_MAXINT, 100, G_PARAM_READWRITE));
g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAY_DEADLOCK,
- g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING",
+ g_param_spec_boolean ("may-deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING",
TRUE, G_PARAM_READWRITE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_WAIT,
+ g_param_spec_ulong ("max-wait", "Max Wait", "How long the queue will wait",
+ 1, 10 * 1000 * 1000, 10 * 1000, G_PARAM_READWRITE));
gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose);
gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
@@ -208,23 +233,22 @@ gst_queue_init (GstQueue *queue)
gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
gst_pad_set_connect_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_connect));
gst_pad_set_getcaps_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_getcaps));
-
+ gst_pad_set_event_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_upstream_event));
+
queue->leaky = GST_QUEUE_NO_LEAK;
- queue->queue = NULL;
- queue->level_buffers = 0;
- queue->level_bytes = 0;
- queue->level_time = 0LL;
- queue->size_buffers = 100; /* 100 buffers */
- queue->size_bytes = 100 * 1024; /* 100KB */
- queue->size_time = 1000000000LL; /* 1sec */
+ queue->queue = g_async_queue_new ();
+ queue->not_full = g_cond_new ();
+ queue->need_src = GST_QUEUE_NEED_NOTHING;
+ queue->need_sink = GST_QUEUE_NEED_NOTHING;
+
+ queue->size = 100; /* 100 buffers */
queue->may_deadlock = TRUE;
+ queue->max_wait = 10 * 1000; /* wait max 10 milliseconds */
- queue->qlock = g_mutex_new ();
- queue->reader = FALSE;
- queue->writer = FALSE;
- queue->not_empty = g_cond_new ();
- queue->not_full = g_cond_new ();
- GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n");
+ queue->upstream_event = NULL;
+ queue->upstream_return = NULL;
+ queue->upstream_mutex = g_mutex_new();
+ queue->upstream_cond = g_cond_new();
}
static void
@@ -232,9 +256,14 @@ gst_queue_dispose (GObject *object)
{
GstQueue *queue = GST_QUEUE (object);
- g_mutex_free (queue->qlock);
- g_cond_free (queue->not_empty);
+ if (queue->upstream_event || queue->upstream_return)
+ g_warning ("losing event while disposing queue");
+ g_cond_free (queue->upstream_cond);
+ g_mutex_free (queue->upstream_mutex);
+
g_cond_free (queue->not_full);
+ g_async_queue_unref (queue->queue);
+
G_OBJECT_CLASS (parent_class)->dispose (object);
}
@@ -250,39 +279,42 @@ gst_queue_get_bufferpool (GstPad *pad)
}
static void
-gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
-{
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, user_data, "cleaning buffer %p\n", data);
-
- gst_data_unref (GST_DATA (data));
-}
-
-static void
gst_queue_locked_flush (GstQueue *queue)
{
- g_list_foreach (queue->queue, gst_queue_cleanup_buffers,
- (gpointer) queue);
- g_list_free (queue->queue);
-
- queue->queue = NULL;
- queue->level_buffers = 0;
- queue->timeval = NULL;
+ GstData *data;
+
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "cleaning queue\n");
+ while ((data = GST_DATA (g_async_queue_try_pop_unlocked (queue->queue))) != NULL)
+ {
+ switch (GST_DATA_TYPE (data))
+ {
+ case GST_EVENT_NEWMEDIA:
+ queue->need_sink |= GST_QUEUE_NEED_NEWMEDIA;
+ break;
+ case GST_EVENT_EOS:
+ queue->need_sink |= GST_QUEUE_NEED_EOS;
+ break;
+ default:
+ queue->need_sink |= GST_QUEUE_NEED_DISCONTINUOUS;
+ break;
+ }
+ gst_data_unref (data);
+ }
+ queue->need_sink |= queue->need_src;
+ queue->need_src = GST_QUEUE_NEED_NOTHING;
+ g_cond_signal (queue->not_full);
}
-/* warning: `gst_queue_flush' defined but not used
static void
gst_queue_flush (GstQueue *queue)
{
- g_mutex_lock (queue->qlock);
+ g_async_queue_lock (queue->queue);
gst_queue_locked_flush (queue);
- g_mutex_unlock (queue->qlock);
+ g_async_queue_unlock (queue->queue);
}
-*/
-
static void
gst_queue_chain (GstPad *pad, GstData *buf)
{
GstQueue *queue;
- gboolean reader;
g_return_if_fail (pad != NULL);
g_return_if_fail (GST_IS_PAD (pad));
@@ -291,134 +323,136 @@ gst_queue_chain (GstPad *pad, GstData *buf)
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
restart:
- /* we have to lock the queue since we span threads */
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
- g_mutex_lock (queue->qlock);
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld\n", pthread_self ());
- if (GST_IS_EVENT (buf)) {
- switch (GST_DATA_TYPE (buf)) {
- case GST_EVENT_FLUSH:
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "FLUSH event, flushing queue\n");
- gst_queue_locked_flush (queue);
+ /* handle events */
+ if (queue->upstream_event)
+ {
+ g_print ("handling event\n");
+ g_mutex_lock (queue->upstream_mutex);
+ GstPad *peer = GST_PAD_PEER (pad);
+
+ if (peer)
+ {
+ queue->upstream_return = gst_pad_send_event (peer, queue->upstream_event);
+ } else {
+ gst_data_unref (queue->upstream_event);
+ }
+ queue->upstream_event = NULL;
+ g_cond_signal (queue->upstream_cond);
+ g_mutex_unlock (queue->upstream_mutex);
+ g_print ("done handling event\n");
+ }
+
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n", buf, GST_BUFFER_SIZE(buf));
+
+ /* if we leak on the upstream side, drop the current buffer */
+ if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM && g_async_queue_length (queue->queue) >= queue->size) {
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
+ /* check if we have to schedule an event for when the stream gets empty */
+ switch (GST_DATA_TYPE (buf))
+ {
+ case GST_EVENT_NEWMEDIA:
+ g_async_queue_lock (queue->queue);
+ queue->need_sink |= GST_QUEUE_NEED_NEWMEDIA;
+ g_async_queue_unlock (queue->queue);
+ break;
+ case GST_EVENT_DISCONTINUOUS:
+ g_async_queue_lock (queue->queue);
+ queue->need_sink |= GST_QUEUE_NEED_DISCONTINUOUS;
+ g_async_queue_unlock (queue->queue);
break;
case GST_EVENT_EOS:
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "eos in on %s %d\n",
- GST_ELEMENT_NAME (queue), queue->level_buffers);
+ g_async_queue_lock (queue->queue);
+ queue->need_sink |= GST_QUEUE_NEED_EOS;
+ g_async_queue_unlock (queue->queue);
break;
default:
- /* gst_pad_event_default (pad, GST_EVENT (buf));*/
break;
}
- }
-
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "adding buffer %p of size %d\n",buf,GST_BUFFER_SIZE(buf));
-
- if (queue->level_buffers == queue->size_buffers) {
- /* if this is a leaky queue... */
- if (queue->leaky) {
- /* FIXME don't want to leak events! */
- /* if we leak on the upstream side, drop the current buffer */
- if (queue->leaky == GST_QUEUE_LEAK_UPSTREAM) {
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
- if (GST_IS_EVENT (buf))
- fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
- GST_ELEMENT_NAME(GST_ELEMENT(queue)),
- GST_DATA_TYPE(buf));
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on upstream end\n");
- gst_data_unref(buf);
- /* now we have to clean up and exit right away */
- g_mutex_unlock (queue->qlock);
- return;
- }
- /* otherwise we have to push a buffer off the other end */
- else {
- GList *front;
- GstData *leakbuf;
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n");
- front = queue->queue;
- leakbuf = (GstData *)(front->data);
- if (GST_IS_EVENT (leakbuf))
- fprintf(stderr, "Error: queue [%s] leaked an event, type:%d\n",
- GST_ELEMENT_NAME(GST_ELEMENT(queue)),
- GST_DATA_TYPE(leakbuf));
- queue->level_buffers--;
- queue->level_bytes -= GST_BUFFER_SIZE(leakbuf);
+ gst_data_unref(buf);
+ /* now we have to exit right away */
+ return;
+ }
+ else if (queue->leaky == GST_QUEUE_LEAK_DOWNSTREAM)
+ {
+ /* while there are too much buffers */
+ while (g_async_queue_length (queue->queue) >= queue->size)
+ {
+ GstData *leakbuf;
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue is full, leaking buffer on downstream end\n");
+ g_async_queue_lock (queue->queue);
+ leakbuf = (GstData *) g_async_queue_try_pop_unlocked (queue->queue);
+ if (leakbuf != NULL)
+ {
+ switch (GST_DATA_TYPE (buf))
+ {
+ case GST_EVENT_NEWMEDIA:
+ queue->need_src |= GST_QUEUE_NEED_NEWMEDIA;
+ break;
+ case GST_EVENT_EOS:
+ queue->need_src |= GST_QUEUE_NEED_EOS;
+ break;
+ default:
+ queue->need_src |= GST_QUEUE_NEED_DISCONTINUOUS;
+ break;
+ }
gst_data_unref (leakbuf);
- queue->queue = g_list_remove_link (queue->queue, front);
- g_list_free (front);
}
+ g_async_queue_unlock (queue->queue);
}
-
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n",
- queue->level_buffers, queue->size_buffers);
- while (queue->level_buffers == queue->size_buffers) {
- /* if there's a pending state change for this queue or its manager, switch */
- /* back to iterator so bottom half of state change executes */
- while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
- g_mutex_unlock (queue->qlock);
- if (gst_element_interrupt (GST_ELEMENT (queue)))
- return;
- goto restart;
- }
- if (GST_STATE (queue) != GST_STATE_PLAYING) {
- /* this means the other end is shut down */
- /* try to signal to resolve the error */
- if (!queue->may_deadlock) {
- gst_data_unref (buf);
- g_mutex_unlock (queue->qlock);
- gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
- return;
- }
- else {
- gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart source pad elements");
- }
+ }
+
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre full wait, level:%d/%d\n",
+ g_async_queue_length (queue->queue), queue->size);
+ /* while there isn't enough space available */
+ if (g_async_queue_length (queue->queue) >= queue->size)
+ {
+ /* if there's a pending state change for this queue or its manager, switch */
+ /* back to iterator so bottom half of state change executes */
+ while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
+ if (gst_element_interrupt (GST_ELEMENT (queue)))
+ return;
+ goto restart;
+ }
+ if (GST_STATE (queue) != GST_STATE_PLAYING) {
+ /* this means the other end is shut down */
+ /* try to signal to resolve the error */
+ if (!queue->may_deadlock) {
+ gst_data_unref (buf);
+ gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
+ return;
+ } else {
+ gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart source pad elements");
}
-
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
- if (queue->writer)
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple writers on queue!\n");
- queue->writer = TRUE;
- g_cond_wait (queue->not_full, queue->qlock);
- queue->writer = FALSE;
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n");
}
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n",
- queue->level_buffers, queue->size_buffers);
+
+ /* FIXME: we're poking in AsyncQueue internals here */
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", g_async_queue_length (queue->queue), queue->size);
+ g_print ("waiting for not null\n");
+ g_mutex_lock (queue->upstream_mutex);
+ if (queue->upstream_event == NULL)
+ g_cond_wait (queue->not_full, queue->upstream_mutex);
+ g_mutex_unlock (queue->upstream_mutex);
+ g_print ("not null\n");
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_full signal\n");
+ goto restart;
}
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post full wait, level:%d/%d\n", g_async_queue_length (queue->queue), queue->size);
/* put the buffer on the tail of the list */
- queue->queue = g_list_append (queue->queue, buf);
- queue->level_buffers++;
- queue->level_bytes += GST_BUFFER_SIZE(buf);
+ g_async_queue_push (queue->queue, buf);
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)+ level:%d/%d\n",
- GST_DEBUG_PAD_NAME(pad),
- queue->level_buffers, queue->size_buffers);
+ GST_DEBUG_PAD_NAME(pad), g_async_queue_length (queue->queue), queue->size);
- /* this assertion _has_ to hold */
- /* g_assert (g_list_length (queue->queue) == queue->level_buffers); */
-
- /* reader waiting on an empty queue */
- reader = queue->reader;
-
- g_mutex_unlock (queue->qlock);
-
- if (reader)
- {
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_empty\n");
- g_cond_signal (queue->not_empty);
- }
}
-static GstBuffer *
+static GstData *
gst_queue_get (GstPad *pad)
{
GstQueue *queue;
- GstBuffer *buf = NULL;
- GList *front;
- gboolean writer;
+ GstData *data;
g_assert(pad != NULL);
g_assert(GST_IS_PAD(pad));
@@ -428,27 +462,58 @@ gst_queue_get (GstPad *pad)
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
restart:
- /* have to lock for thread-safety */
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
- g_mutex_lock (queue->qlock);
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locked t:%ld %p\n", pthread_self (), queue->not_empty);
+
+ /* make sure we're not in the middle of a state change */
+ while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
+ if (gst_element_interrupt (GST_ELEMENT (queue)))
+ return NULL;
+ }
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
- while (queue->level_buffers == 0) {
+ g_async_queue_lock (queue->queue);
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "pre empty wait, level:%d/%d\n", g_async_queue_length_unlocked (queue->queue), queue->size);
+ if (queue->need_src)
+ {
+ GstData *event;
+ switch (queue->need_src)
+ {
+ case GST_QUEUE_NEED_NEWMEDIA:
+ event = GST_DATA (gst_event_new_newmedia ());
+ break;
+ case GST_QUEUE_NEED_EOS:
+ event = GST_DATA (gst_event_new_eos ());
+ break;
+ case GST_QUEUE_NEED_DISCONTINUOUS:
+ event = GST_DATA (gst_event_new_discontinuous ());
+ break;
+ default:
+ event = NULL;
+ g_assert_not_reached ();
+ break;
+ }
+ data = g_async_queue_peek_unlocked (queue->queue);
+ if (data)
+ {
+ guint i;
+ for (i = 0; i < GST_OFFSET_TYPES; i++)
+ {
+ event->offset[i] = data->offset[i];
+ }
+ }
+ queue->need_src = GST_QUEUE_NEED_NOTHING;
+ g_async_queue_unlock (queue->queue);
+ return event;
+ }
+ if ((data = g_async_queue_pop_unlocked (queue->queue)) == NULL)
+ {
+ GTimeVal timeval;
/* if there's a pending state change for this queue or its manager, switch
* back to iterator so bottom half of state change executes
*/
- while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
- g_mutex_unlock (queue->qlock);
- if (gst_element_interrupt (GST_ELEMENT (queue)))
- return NULL;
- goto restart;
- }
if (GST_STATE (queue) != GST_STATE_PLAYING) {
/* this means the other end is shut down */
if (!queue->may_deadlock) {
- g_mutex_unlock (queue->qlock);
+ g_async_queue_unlock (queue->queue);
gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down");
goto restart;
}
@@ -457,56 +522,37 @@ restart:
}
}
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
- if (queue->reader)
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "WARNING: multiple readers on queue!\n");
- queue->reader = TRUE;
- g_cond_wait (queue->not_empty, queue->qlock);
- queue->reader = FALSE;
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", g_async_queue_length_unlocked (queue->queue), queue->size);
+ g_get_current_time (&timeval);
+ g_time_val_add (&timeval, queue->max_wait);
+ g_print ("waiting for data\n");
+ data = g_async_queue_timed_pop_unlocked (queue->queue, &timeval);
+ g_print ("%s\n", data == NULL ? "timer goes off while waiting for data\n" : "data available\n");
+ if (data == NULL)
+ {
+ g_async_queue_unlock (queue->queue);
+ gst_element_yield (GST_ELEMENT (data));
+ goto restart;
+ }
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "got not_empty signal\n");
}
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
-
- front = queue->queue;
- buf = (GstBuffer *)(front->data);
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "retrieved buffer %p from queue\n", buf);
- queue->queue = g_list_remove_link (queue->queue, front);
- g_list_free (front);
-
- queue->level_buffers--;
- queue->level_bytes -= GST_BUFFER_SIZE(buf);
-
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "post empty wait, level:%d/%d\n", g_async_queue_length_unlocked (queue->queue), queue->size);
+ g_async_queue_unlock (queue->queue);
+
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "(%s:%s)- level:%d/%d\n",
GST_DEBUG_PAD_NAME(pad),
- queue->level_buffers, queue->size_buffers);
-
- /* this assertion _has_ to hold */
- /* g_assert (g_list_length (queue->queue) == queue->level_buffers); */
+ g_async_queue_length (queue->queue), queue->size);
- /* writer waiting on a full queue */
- writer = queue->writer;
-
- g_mutex_unlock (queue->qlock);
-
- if (writer)
- {
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "signalling not_full\n");
- g_cond_signal (queue->not_full);
- }
+ g_cond_signal (queue->not_full);
/* FIXME where should this be? locked? */
- if (GST_IS_EVENT(buf)) {
- switch (GST_DATA_TYPE(buf)) {
- case GST_EVENT_EOS:
- GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos\n", GST_ELEMENT_NAME (queue));
- gst_element_set_eos (GST_ELEMENT (queue));
- break;
- default:
- break;
- }
+ if (GST_DATA_TYPE(data) == GST_EVENT_EOS)
+ {
+ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "queue \"%s\" eos\n", GST_ELEMENT_NAME (queue));
+ gst_element_set_eos (GST_ELEMENT (queue));
}
- return buf;
+ return data;
}
static GstElementStateReturn
@@ -524,13 +570,12 @@ gst_queue_change_state (GstElement *element)
/* lock the queue so another thread (not in sync with this thread's state)
* can't call this queue's _get (or whatever)
*/
- g_mutex_lock (queue->qlock);
+ g_async_queue_lock (queue->queue);
new_state = GST_STATE_PENDING (element);
if (new_state == GST_STATE_PAUSED) {
- //g_cond_signal (queue->not_full);
- //g_cond_signal (queue->not_empty);
+ /* g_cond_signal (queue->not_full); */
}
else if (new_state == GST_STATE_READY) {
gst_queue_locked_flush (queue);
@@ -538,16 +583,14 @@ gst_queue_change_state (GstElement *element)
else if (new_state == GST_STATE_PLAYING) {
if (!GST_PAD_IS_CONNECTED (queue->sinkpad)) {
/* FIXME can this be? */
- if (queue->reader)
- g_cond_signal (queue->not_empty);
- g_mutex_unlock (queue->qlock);
+ g_async_queue_unlock (queue->queue);
return GST_STATE_FAILURE;
}
}
ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
- g_mutex_unlock (queue->qlock);
+ g_async_queue_unlock (queue->queue);
GST_DEBUG_LEAVE("('%s')", GST_ELEMENT_NAME (element));
return ret;
@@ -567,12 +610,19 @@ gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GPa
switch (prop_id) {
case ARG_LEAKY:
queue->leaky = g_value_get_enum (value);
+ g_object_notify (object, "leaky");
break;
case ARG_MAX_LEVEL:
- queue->size_buffers = g_value_get_int (value);
+ queue->size = g_value_get_int (value);
+ g_object_notify (object, "max-level");
break;
case ARG_MAY_DEADLOCK:
queue->may_deadlock = g_value_get_boolean (value);
+ g_object_notify (object, "may-deadlock");
+ break;
+ case ARG_MAX_WAIT:
+ queue->max_wait = g_value_get_ulong (value);
+ g_object_notify (object, "max-wait");
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -595,16 +645,67 @@ gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpe
g_value_set_enum (value, queue->leaky);
break;
case ARG_LEVEL:
- g_value_set_int (value, queue->level_buffers);
+ g_value_set_int (value, g_async_queue_length (queue->queue));
break;
case ARG_MAX_LEVEL:
- g_value_set_int (value, queue->size_buffers);
+ g_value_set_int (value, queue->size);
break;
case ARG_MAY_DEADLOCK:
g_value_set_boolean (value, queue->may_deadlock);
break;
+ case ARG_MAX_WAIT:
+ g_value_set_ulong (value, queue->max_wait);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
+static gpointer
+gst_queue_upstream_event (GstPad *pad, GstData *event)
+{
+ GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad));
+ gpointer *ret = NULL;
+
+ /* check that everything is ok. */
+ if (queue->upstream_event || queue->upstream_return)
+ {
+ g_warning ("unhandled event in queue.");
+ queue->upstream_return = NULL;
+ }
+
+ /* transfer event */
+ g_mutex_lock (queue->upstream_mutex);
+ gst_data_ref (event);
+ queue->upstream_event = event;
+ g_cond_signal (queue->not_full);
+ g_print ("waiting for handling of upstream event\n");
+ while (queue->upstream_event != NULL)
+ g_cond_wait (queue->upstream_cond, queue->upstream_mutex);
+ g_print ("upstream event handled\n");
+ ret = queue->upstream_return;
+ queue->upstream_return = NULL;
+ g_mutex_unlock (queue->upstream_mutex);
+
+ /* handle event */
+ if (ret != NULL)
+ {
+ switch (GST_DATA_TYPE (event))
+ {
+ case GST_EVENT_SEEK:
+ if (GST_EVENT_SEEK_FLUSH(event))
+ gst_queue_flush (queue);
+ break;
+ case GST_EVENT_FLUSH:
+ gst_queue_flush (queue);
+ break;
+ default:
+ break;
+ }
+ }
+
+ /* return */
+ gst_data_unref (event);
+ return ret;
+}
+
diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h
index 06485edbc1..48be10adf4 100644
--- a/plugins/elements/gstqueue.h
+++ b/plugins/elements/gstqueue.h
@@ -53,6 +53,13 @@ enum {
GST_QUEUE_LEAK_DOWNSTREAM = 2
};
+enum {
+ GST_QUEUE_NEED_NOTHING = 0x0,
+ GST_QUEUE_NEED_DISCONTINUOUS = 0x1,
+ GST_QUEUE_NEED_EOS = 0x3,
+ GST_QUEUE_NEED_NEWMEDIA = 0x7,
+};
+
typedef struct _GstQueue GstQueue;
typedef struct _GstQueueClass GstQueueClass;
@@ -63,27 +70,21 @@ struct _GstQueue {
GstPad *srcpad;
/* the queue of buffers we're keeping our grubby hands on */
- GList *queue;
-
- guint level_buffers; /* number of buffers queued here */
- guint level_bytes; /* number of bytes queued here */
- guint64 level_time; /* amount of time queued here */
-
- guint size_buffers; /* size of queue in buffers */
- guint size_bytes; /* size of queue in bytes */
- guint64 size_time; /* size of queue in time */
+ GAsyncQueue *queue;
+ GCond *not_full; /* signals space now available for writing */
+ /* lock queue before changing next two values */
+ guint8 need_src; /* we need this event before pushing the next buffer */
+ guint8 need_sink; /* we need this event before popping the next buffer */
+ gint size; /* size of queue in buffers */
gint leaky; /* whether the queue is leaky, and if so at which end */
gboolean may_deadlock; /* it the queue should fail on possible deadlocks */
-
- GMutex *qlock; /* lock for queue (vs object lock) */
- /* we are single reader and single writer queue */
- gboolean reader; /* reader waiting on empty queue */
- gboolean writer; /* writer waiting on full queue */
- GCond *not_empty; /* signals buffers now available for reading */
- GCond *not_full; /* signals space now available for writing */
-
- GTimeVal *timeval; /* the timeout for the queue locking */
+ gulong max_wait; /* the timeout for locking in microseconds */
+
+ GstData *upstream_event;
+ gpointer upstream_return;
+ GMutex *upstream_mutex;
+ GCond *upstream_cond;
};
struct _GstQueueClass {