summaryrefslogtreecommitdiff
path: root/gioscheduler.c
diff options
context:
space:
mode:
authorAlexander Larsson <alexl@src.gnome.org>2007-09-13 07:47:11 +0000
committerAlexander Larsson <alexl@src.gnome.org>2007-09-13 07:47:11 +0000
commit4ffa0e576a67189362214d5b57c5e1356de41e8b (patch)
treec949c7af7e562cdff3b8b7b532357b51d90e2424 /gioscheduler.c
parent9415407715053833af296e03440e3760fd3e75ac (diff)
downloadgvfs-4ffa0e576a67189362214d5b57c5e1356de41e8b.tar.gz
Initial revision
Original git commit by alex <alex> at 1159272204 +0000 svn path=/trunk/; revision=3
Diffstat (limited to 'gioscheduler.c')
-rw-r--r--gioscheduler.c272
1 files changed, 272 insertions, 0 deletions
diff --git a/gioscheduler.c b/gioscheduler.c
new file mode 100644
index 00000000..d49b72eb
--- /dev/null
+++ b/gioscheduler.c
@@ -0,0 +1,272 @@
+#include <config.h>
+
+#include "gioscheduler.h"
+
+struct _GIOJob {
+ gint id;
+ GIOJobFunc job_func;
+ GIODataFunc cancel_func; /* Runs under job map lock */
+ gpointer data;
+ GDestroyNotify destroy_notify;
+
+ GMainContext *callback_context;
+ gint io_priority;
+ gboolean cancelled;
+};
+
+static GHashTable *job_map = NULL;
+static GThreadPool *job_thread_pool = NULL;
+static gint next_job_id = 1;
+/* Serializes access to the job_map hash, and handles
+ * lifetime issues for the jobs. (i.e. other than the
+ * io thread you can only access the job when the job_map
+ * lock is held) */
+G_LOCK_DEFINE_STATIC(job_map);
+
+static void io_job_thread (gpointer data,
+ gpointer user_data);
+
+
+static gint
+g_io_job_compare (gconstpointer a,
+ gconstpointer b,
+ gpointer user_data)
+{
+ const GIOJob *aa = a;
+ const GIOJob *bb = b;
+
+ /* Lower value => higher priority */
+
+ if (aa->io_priority < bb->io_priority)
+ return -1;
+ if (aa->io_priority == bb->io_priority)
+ return 0;
+ return 1;
+}
+
+static void
+init_scheduler (void)
+{
+ if (job_map == NULL)
+ {
+ /* TODO: thread_pool_new can fail */
+ job_thread_pool = g_thread_pool_new (io_job_thread,
+ NULL,
+ 10,
+ FALSE,
+ NULL);
+ g_thread_pool_set_sort_function (job_thread_pool,
+ g_io_job_compare,
+ NULL);
+ job_map = g_hash_table_new (g_direct_hash, g_direct_equal);
+ }
+}
+
+static void
+io_job_thread (gpointer data,
+ gpointer user_data)
+{
+ GIOJob *job = data;
+
+ job->job_func (job, job->data);
+
+ /* Note: We can still get cancel calls here, which means
+ * we can't free the data until after removal from the
+ * job_map.
+ */
+
+ G_LOCK (job_map);
+ g_hash_table_remove (job_map, GINT_TO_POINTER (job->id));
+ G_UNLOCK (job_map);
+
+ if (job->destroy_notify)
+ job->destroy_notify (job->data);
+
+ g_main_context_unref (job->callback_context);
+ g_free (job);
+}
+
+
+gint
+g_schedule_io_job (GIOJobFunc job_func,
+ GIODataFunc cancel_func,
+ gpointer data,
+ GDestroyNotify notify,
+ gint io_priority,
+ GMainContext *callback_context)
+{
+ GIOJob *job;
+
+ if (callback_context == NULL)
+ callback_context = g_main_context_default ();
+
+ job = g_new0 (GIOJob, 1);
+ job->id = g_atomic_int_exchange_and_add (&next_job_id, 1);
+ job->job_func = job_func;
+ job->cancel_func = cancel_func;
+ job->data = data;
+ job->destroy_notify = notify;
+ job->io_priority = io_priority;
+ job->callback_context = g_main_context_ref (callback_context);
+ job->cancelled = FALSE;
+
+ G_LOCK (job_map);
+ init_scheduler ();
+
+ g_hash_table_insert (job_map, GINT_TO_POINTER (job->id), job);
+
+ G_UNLOCK (job_map);
+
+ /* TODO: We ignore errors */
+ g_thread_pool_push (job_thread_pool, job, NULL);
+
+ return job->id;
+}
+
+
+void
+g_cancel_io_job (gint id)
+{
+ GIOJob *job;
+
+ G_LOCK (job_map);
+ init_scheduler ();
+
+ job = g_hash_table_lookup (job_map, GINT_TO_POINTER (id));
+
+ if (job && !job->cancelled)
+ {
+ job->cancelled = TRUE;
+ if (job->cancel_func)
+ job->cancel_func (job->data);
+ }
+
+ G_UNLOCK (job_map);
+}
+
+/* Called with job_map lock held */
+static void
+foreach_job_cancel (gpointer key,
+ gpointer value,
+ gpointer user_data)
+{
+ GIOJob *job = value;
+
+ if (!job->cancelled)
+ {
+ job->cancelled = TRUE;
+ if (job->cancel_func)
+ job->cancel_func (job->data);
+ }
+}
+
+void
+g_cancel_all_io_jobs (void)
+{
+ G_LOCK (job_map);
+ init_scheduler ();
+ g_hash_table_foreach (job_map,
+ foreach_job_cancel,
+ NULL);
+
+
+ G_UNLOCK (job_map);
+}
+
+typedef struct {
+ GIODataFunc func;
+ gpointer data;
+ GDestroyNotify notify;
+
+ GMutex *ack_lock;
+ GCond *ack_condition;
+} MainLoopProxy;
+
+static gboolean
+mainloop_proxy_func (gpointer data)
+{
+ MainLoopProxy *proxy = data;
+
+ proxy->func (proxy->data);
+
+ if (proxy->ack_lock)
+ {
+ g_mutex_lock (proxy->ack_lock);
+ g_cond_signal (proxy->ack_condition);
+ g_mutex_unlock (proxy->ack_lock);
+ }
+
+ return FALSE;
+}
+
+static void
+mainloop_proxy_free (MainLoopProxy *proxy)
+{
+ if (proxy->ack_lock)
+ {
+ g_mutex_free (proxy->ack_lock);
+ g_cond_free (proxy->ack_condition);
+ }
+
+ g_free (proxy);
+}
+
+static void
+mainloop_proxy_notify (gpointer data)
+{
+ MainLoopProxy *proxy = data;
+
+ if (proxy->notify)
+ proxy->notify (proxy->data);
+
+ /* If nonblocking we free here, otherwise we free in io thread */
+ if (proxy->ack_lock == NULL)
+ mainloop_proxy_free (proxy);
+}
+
+void
+g_io_job_send_to_mainloop (GIOJob *job,
+ GIODataFunc func,
+ gpointer data,
+ GDestroyNotify notify,
+ gboolean block)
+{
+ GSource *source;
+ MainLoopProxy *proxy;
+ guint id;
+
+ proxy = g_new0 (MainLoopProxy, 1);
+ proxy->func = func;
+ proxy->data = data;
+ proxy->notify = notify;
+ if (block)
+ {
+ proxy->ack_lock = g_mutex_new ();
+ proxy->ack_condition = g_cond_new ();
+ }
+
+ source = g_idle_source_new ();
+ g_source_set_priority (source, G_PRIORITY_DEFAULT);
+
+ g_source_set_callback (source, mainloop_proxy_func, proxy, mainloop_proxy_notify);
+
+ if (block)
+ g_mutex_lock (proxy->ack_lock);
+
+ id = g_source_attach (source, job->callback_context);
+ g_source_unref (source);
+
+ if (block) {
+ g_cond_wait (proxy->ack_condition, proxy->ack_lock);
+ g_mutex_unlock (proxy->ack_lock);
+
+ /* destroy notify didn't free proxy */
+ mainloop_proxy_free (proxy);
+ }
+}
+
+gboolean
+g_io_job_is_cancelled (GIOJob *job)
+{
+ return job->cancelled;
+}