diff options
author | Carlos Garnacho <carlos@lanedo.com> | 2011-07-14 15:03:19 +0200 |
---|---|---|
committer | Jürg Billeter <j@bitron.ch> | 2011-07-22 14:26:40 +0200 |
commit | 0478db84a158bc4a4c99172c81004115d7fb0453 (patch) | |
tree | eec0c51aab22b697af94d4b202c9cb61b4ae1cec | |
parent | ad173647549ac12b283c6cab2a142838e8193b18 (diff) | |
download | tracker-0478db84a158bc4a4c99172c81004115d7fb0453.tar.gz |
libtracker-miner: Add priorities to the sparql buffer
If a task is of priority G_PRIORITY_HIGH, it will issue an Update()
instead of a UpdateArray(), so updates are most immediate.
-rw-r--r-- | src/libtracker-miner/tracker-miner-fs.c | 4 | ||||
-rw-r--r-- | src/libtracker-miner/tracker-sparql-buffer.c | 84 | ||||
-rw-r--r-- | src/libtracker-miner/tracker-sparql-buffer.h | 1 |
3 files changed, 73 insertions, 16 deletions
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c index 9ca23a581..229475d45 100644 --- a/src/libtracker-miner/tracker-miner-fs.c +++ b/src/libtracker-miner/tracker-miner-fs.c @@ -1891,6 +1891,7 @@ item_add_or_update_cb (TrackerMinerFS *fs, tracker_sparql_buffer_push (fs->priv->sparql_buffer, sparql_task, + ctxt->priority, sparql_buffer_task_finished_cb, fs); @@ -1999,6 +2000,7 @@ item_remove (TrackerMinerFS *fs, tracker_sparql_buffer_push (fs->priv->sparql_buffer, task, + G_PRIORITY_DEFAULT, sparql_buffer_task_finished_cb, fs); @@ -2017,6 +2019,7 @@ item_remove (TrackerMinerFS *fs, tracker_sparql_buffer_push (fs->priv->sparql_buffer, task, + G_PRIORITY_DEFAULT, sparql_buffer_task_finished_cb, fs); @@ -2364,6 +2367,7 @@ item_move (TrackerMinerFS *fs, FALSE)); tracker_sparql_buffer_push (fs->priv->sparql_buffer, task, + G_PRIORITY_DEFAULT, sparql_buffer_task_finished_cb, fs); diff --git a/src/libtracker-miner/tracker-sparql-buffer.c b/src/libtracker-miner/tracker-sparql-buffer.c index bd3abe9b5..cb35b44b7 100644 --- a/src/libtracker-miner/tracker-sparql-buffer.c +++ b/src/libtracker-miner/tracker-sparql-buffer.c @@ -26,6 +26,7 @@ typedef struct _TrackerSparqlBufferPrivate TrackerSparqlBufferPrivate; typedef struct _SparqlTaskData SparqlTaskData; typedef struct _UpdateArrayData UpdateArrayData; +typedef struct _UpdateData UpdateData; typedef struct _BulkOperationMerge BulkOperationMerge; enum { @@ -64,6 +65,11 @@ struct _SparqlTaskData GSimpleAsyncResult *result; }; +struct _UpdateData { + TrackerSparqlBuffer *buffer; + TrackerTask *task; +}; + struct _UpdateArrayData { TrackerSparqlBuffer *buffer; GPtrArray *tasks; @@ -544,9 +550,30 @@ tracker_sparql_buffer_flush (TrackerSparqlBuffer *buffer, return TRUE; } +static void +tracker_sparql_buffer_update_cb (GObject *object, + GAsyncResult *result, + gpointer user_data) +{ + UpdateData *update_data = user_data; + GError *error = NULL; + + tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object), + result, &error); + if (error) { + g_critical ("Error in prioritized update: %s\n", error->message); + g_error_free (error); + } + + tracker_task_pool_remove (TRACKER_TASK_POOL (update_data->buffer), + update_data->task); + g_slice_free (UpdateData, update_data); +} + void tracker_sparql_buffer_push (TrackerSparqlBuffer *buffer, TrackerTask *task, + gint priority, GAsyncReadyCallback cb, gpointer user_data) { @@ -558,27 +585,52 @@ tracker_sparql_buffer_push (TrackerSparqlBuffer *buffer, priv = buffer->priv; - if (tracker_task_pool_get_size (TRACKER_TASK_POOL (buffer)) == 0) { - reset_flush_timeout (buffer); - } + data = tracker_task_get_data (task); + data->result = g_simple_async_result_new (G_OBJECT (buffer), + cb, user_data, NULL); - tracker_task_pool_add (TRACKER_TASK_POOL (buffer), task); + if (priority <= G_PRIORITY_HIGH && + data->type != TASK_TYPE_BULK) { + UpdateData *update_data; + const gchar *sparql = NULL; - if (!priv->tasks) { - priv->tasks = g_ptr_array_new_with_free_func ((GDestroyNotify) tracker_task_unref); - } + /* High priority task */ + update_data = g_slice_new0 (UpdateData); + update_data->buffer = buffer; + update_data->task = task; - g_ptr_array_add (priv->tasks, task); + if (data->type == TASK_TYPE_SPARQL_STR) { + sparql = data->data.str; + } else if (data->type == TASK_TYPE_SPARQL) { + sparql = tracker_sparql_builder_get_result (data->data.builder); + } - data = tracker_task_get_data (task); - data->result = g_simple_async_result_new (G_OBJECT (buffer), - cb, user_data, NULL); + tracker_task_pool_add (TRACKER_TASK_POOL (buffer), task); + tracker_sparql_connection_update_async (priv->connection, + sparql, + G_PRIORITY_HIGH, + NULL, + tracker_sparql_buffer_update_cb, + update_data); + } else { + if (tracker_task_pool_get_size (TRACKER_TASK_POOL (buffer)) == 0) { + reset_flush_timeout (buffer); + } + + tracker_task_pool_add (TRACKER_TASK_POOL (buffer), task); + + if (!priv->tasks) { + priv->tasks = g_ptr_array_new_with_free_func ((GDestroyNotify) tracker_task_unref); + } + + g_ptr_array_add (priv->tasks, task); - if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (buffer))) { - tracker_sparql_buffer_flush (buffer, "SPARQL buffer limit reached"); - } else if (priv->tasks->len > tracker_task_pool_get_limit (TRACKER_TASK_POOL (buffer)) / 2) { - /* We've filled half of the buffer, flush it as we receive more tasks */ - tracker_sparql_buffer_flush (buffer, "SPARQL buffer half-full"); + if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (buffer))) { + tracker_sparql_buffer_flush (buffer, "SPARQL buffer limit reached"); + } else if (priv->tasks->len > tracker_task_pool_get_limit (TRACKER_TASK_POOL (buffer)) / 2) { + /* We've filled half of the buffer, flush it as we receive more tasks */ + tracker_sparql_buffer_flush (buffer, "SPARQL buffer half-full"); + } } } diff --git a/src/libtracker-miner/tracker-sparql-buffer.h b/src/libtracker-miner/tracker-sparql-buffer.h index 6ee13b470..919ad9ae2 100644 --- a/src/libtracker-miner/tracker-sparql-buffer.h +++ b/src/libtracker-miner/tracker-sparql-buffer.h @@ -65,6 +65,7 @@ gboolean tracker_sparql_buffer_flush (TrackerSparqlBuffer *buffer, void tracker_sparql_buffer_push (TrackerSparqlBuffer *buffer, TrackerTask *task, + gint priority, GAsyncReadyCallback cb, gpointer user_data); |