summaryrefslogtreecommitdiff
path: root/sys/ipcpipeline
diff options
context:
space:
mode:
authorGeorge Kiagiadakis <george.kiagiadakis@collabora.com>2017-09-04 15:52:03 +0300
committerGeorge Kiagiadakis <george.kiagiadakis@collabora.com>2017-09-04 15:52:03 +0300
commit15927b6511bc8304ae144a45c9fbfca88e5dd641 (patch)
treef9a0350101f84194a47812e7a41d99b4e29af347 /sys/ipcpipeline
parent91edec25dd2a50e44b970296e55e1570c3e0d33c (diff)
downloadgstreamer-plugins-bad-15927b6511bc8304ae144a45c9fbfca88e5dd641.tar.gz
ipcpipeline: use GstPoll instead of select() to watch for socket activity
... and make that code more readable in the process https://bugzilla.gnome.org/show_bug.cgi?id=787208
Diffstat (limited to 'sys/ipcpipeline')
-rw-r--r--sys/ipcpipeline/gstipcpipelinecomm.c171
-rw-r--r--sys/ipcpipeline/gstipcpipelinecomm.h6
2 files changed, 79 insertions, 98 deletions
diff --git a/sys/ipcpipeline/gstipcpipelinecomm.c b/sys/ipcpipeline/gstipcpipelinecomm.c
index e21da99f8..6f17a11fe 100644
--- a/sys/ipcpipeline/gstipcpipelinecomm.c
+++ b/sys/ipcpipeline/gstipcpipelinecomm.c
@@ -1605,15 +1605,16 @@ gst_ipc_pipeline_comm_init (GstIpcPipelineComm * comm, GstElement * element)
g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL,
(GDestroyNotify) comm_request_free);
comm->adapter = gst_adapter_new ();
- g_atomic_int_set (&comm->thread_running, 0);
+ comm->poll = gst_poll_new (TRUE);
+ gst_poll_fd_init (&comm->pollFDin);
}
void
gst_ipc_pipeline_comm_clear (GstIpcPipelineComm * comm)
{
- g_assert (!g_atomic_int_get (&comm->thread_running));
g_hash_table_destroy (comm->waiting_ids);
gst_object_unref (comm->adapter);
+ gst_poll_free (comm->poll);
g_mutex_clear (&comm->mutex);
}
@@ -1698,91 +1699,75 @@ gst_ipc_pipeline_comm_reply_request (GstIpcPipelineComm * comm, guint32 id,
return TRUE;
}
-static gboolean
+static gint
update_adapter (GstIpcPipelineComm * comm)
{
GstMemory *mem = NULL;
-
- for (;;) {
- fd_set set;
- struct timeval tv;
- int sret;
- ssize_t sz;
- GstBuffer *buf;
- GstMapInfo map;
- int fdin = comm->fdin;
- int fdclose = comm->reader_thread_stopping_pipe[0];
- int fdmax;
-
- FD_ZERO (&set);
- FD_SET (fdclose, &set);
- fdmax = fdclose;
- if (fdin >= 0 && GST_ELEMENT_PARENT (comm->element)) {
- FD_SET (fdin, &set);
- if (fdin > fdmax)
- fdmax = fdin;
- }
- tv.tv_sec = 0;
- tv.tv_usec = 100000;
- sret = select (fdmax + 1, &set, NULL, NULL, &tv);
- if (sret < 0) {
- if (errno == EAGAIN)
- continue;
- if (errno == EINTR)
- break;
- if (mem)
- gst_memory_unref (mem);
- return FALSE;
+ GstBuffer *buf;
+ GstMapInfo map;
+ ssize_t sz;
+ gint ret = 0;
+
+again:
+ /* update pollFDin if necessary (fdin changed or we lost our parent).
+ * we do not allow a parent-less element to communicate with its peer
+ * in order to avoid race conditions where the slave tries to change
+ * the state of its parent pipeline while it is not yet added in that
+ * pipeline. */
+ if (comm->pollFDin.fd != comm->fdin || !GST_OBJECT_PARENT (comm->element)) {
+ if (comm->pollFDin.fd != -1) {
+ GST_DEBUG_OBJECT (comm->element, "Stop watching fd %d",
+ comm->pollFDin.fd);
+ gst_poll_remove_fd (comm->poll, &comm->pollFDin);
+ gst_poll_fd_init (&comm->pollFDin);
}
- if (FD_ISSET (fdclose, &set)) {
- GST_INFO_OBJECT (comm->element, "data received on close notify pipe");
- comm->reader_thread_stopping = TRUE;
- break;
+ if (comm->fdin != -1 && GST_OBJECT_PARENT (comm->element)) {
+ GST_DEBUG_OBJECT (comm->element, "Start watching fd %d", comm->fdin);
+ comm->pollFDin.fd = comm->fdin;
+ gst_poll_add_fd (comm->poll, &comm->pollFDin);
+ gst_poll_fd_ctl_read (comm->poll, &comm->pollFDin, TRUE);
}
- if (fdin < 0)
- break;
- if (!FD_ISSET (fdin, &set))
- break;
- if (mem == NULL)
+ }
+
+ /* wait for activity on fdin or a flush */
+ if (gst_poll_wait (comm->poll, 100 * GST_MSECOND) < 0) {
+ if (errno == EAGAIN)
+ goto again;
+ /* error out, unless interrupted or flushing */
+ if (errno != EINTR)
+ ret = (errno == EBUSY) ? 2 : 1;
+ }
+
+ /* read from fdin if possible and push data to our adapter */
+ if (comm->pollFDin.fd >= 0
+ && gst_poll_fd_can_read (comm->poll, &comm->pollFDin)) {
+ if (!mem)
mem = gst_allocator_alloc (NULL, comm->read_chunk_size, NULL);
+
gst_memory_map (mem, &map, GST_MAP_WRITE);
- sz = read (fdin, map.data, map.size);
+ sz = read (comm->pollFDin.fd, map.data, map.size);
gst_memory_unmap (mem, &map);
- if (sz < 0) {
+
+ if (sz <= 0) {
if (errno == EAGAIN)
- continue;
+ goto again;
+ /* error out, unless interrupted */
+ if (errno != EINTR)
+ ret = 1;
+ } else {
+ gst_memory_resize (mem, 0, sz);
+ buf = gst_buffer_new ();
+ gst_buffer_append_memory (buf, mem);
mem = NULL;
- if (errno == EINTR)
- break;
- gst_memory_unref (mem);
- return FALSE;
+ GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz);
+ gst_adapter_push (comm->adapter, buf);
}
- if (sz == 0) {
- GST_INFO_OBJECT (comm->element, "fd closed");
- comm->reader_thread_stopping = TRUE;
- break;
- }
- gst_memory_resize (mem, 0, sz);
- buf = gst_buffer_new ();
- gst_buffer_append_memory (buf, mem);
- mem = NULL;
- GST_TRACE_OBJECT (comm->element, "Read %u bytes from fd", (unsigned) sz);
- gst_adapter_push (comm->adapter, buf);
-
- /* If we have more data, we loop, otherwise we break */
- FD_ZERO (&set);
- if (fdin >= 0)
- FD_SET (comm->fdin, &set);
- tv.tv_sec = 0;
- tv.tv_usec = 0;
- sret = select (fdin + 1, &set, NULL, NULL, &tv);
- if (sret < 0 || !FD_ISSET (fdin, &set))
- break;
}
+
if (mem)
gst_memory_unref (mem);
- return TRUE;
+ return ret;
}
static gboolean
@@ -2151,24 +2136,28 @@ static gpointer
reader_thread (gpointer data)
{
GstIpcPipelineComm *comm = (GstIpcPipelineComm *) data;
-
- g_atomic_int_set (&comm->thread_running, 1);
- while (!comm->reader_thread_stopping) {
- if (!update_adapter (comm)) {
- if (comm->reader_thread_stopping) {
+ gboolean running = TRUE;
+ gint ret = 0;
+
+ while (running) {
+ ret = update_adapter (comm);
+ switch (ret) {
+ case 1:
+ GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
+ ("Failed to read from socket"));
+ running = FALSE;
+ break;
+ case 2:
GST_INFO_OBJECT (comm->element, "We're stopping, all good");
+ running = FALSE;
+ break;
+ default:
+ read_many (comm);
break;
- }
- GST_ELEMENT_ERROR (comm->element, RESOURCE, READ, (NULL),
- ("Failed to read from socket"));
- break;
}
- read_many (comm);
}
GST_INFO_OBJECT (comm->element, "Reader thread ending");
- g_atomic_int_set (&comm->thread_running, 0);
-
return NULL;
}
@@ -2184,7 +2173,6 @@ gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
if (comm->reader_thread)
return FALSE;
- comm->reader_thread_stopping = FALSE;
comm->state = GST_IPC_PIPELINE_COMM_STATE_TYPE;
comm->on_buffer = on_buffer;
comm->on_event = on_event;
@@ -2193,10 +2181,7 @@ gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
comm->on_state_lost = on_state_lost;
comm->on_message = on_message;
comm->user_data = user_data;
- if (pipe (comm->reader_thread_stopping_pipe) < 0) {
- GST_WARNING_OBJECT (comm->element, "Failed to create pipes");
- return FALSE;
- }
+ gst_poll_set_flushing (comm->poll, FALSE);
comm->reader_thread =
g_thread_new ("reader", (GThreadFunc) reader_thread, comm);
return TRUE;
@@ -2205,15 +2190,11 @@ gst_ipc_pipeline_comm_start_reader_thread (GstIpcPipelineComm * comm,
void
gst_ipc_pipeline_comm_stop_reader_thread (GstIpcPipelineComm * comm)
{
- char dummy = 0;
-
if (!comm->reader_thread)
return;
- while (write (comm->reader_thread_stopping_pipe[1], &dummy, 1) < 0
- && errno == EINTR);
+
+ gst_poll_set_flushing (comm->poll, TRUE);
g_thread_join (comm->reader_thread);
- close (comm->reader_thread_stopping_pipe[0]);
- close (comm->reader_thread_stopping_pipe[1]);
comm->reader_thread = NULL;
}
diff --git a/sys/ipcpipeline/gstipcpipelinecomm.h b/sys/ipcpipeline/gstipcpipelinecomm.h
index bd7335e9e..cebbadb21 100644
--- a/sys/ipcpipeline/gstipcpipelinecomm.h
+++ b/sys/ipcpipeline/gstipcpipelinecomm.h
@@ -64,9 +64,9 @@ typedef struct
GHashTable *waiting_ids;
GThread *reader_thread;
- gboolean reader_thread_stopping;
- volatile gint thread_running;
- int reader_thread_stopping_pipe[2];
+ GstPoll *poll;
+ GstPollFD pollFDin;
+
GstAdapter *adapter;
guint8 state;
guint32 send_id;