summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Thursfield <sam.thursfield@codethink.co.uk>2012-06-27 14:41:29 +0100
committerMartyn Russell <martyn@lanedo.com>2012-07-05 17:02:45 +0100
commit78a889dd380e390a22b02a193b13013875a1796f (patch)
tree891d3aa0f42b46dcdd6798889a8aeee202519ffe
parent9f066839591149ab398ef54c0e95afada952dee2 (diff)
downloadtracker-78a889dd380e390a22b02a193b13013875a1796f.tar.gz
libtracker-miner: Prevent item queue handlers from running when blocked
If we pop the next item from a queue and find that either that file or its parent is still being processed, the item queue handler must wait until the outstanding tasks finish and are committed. In this case we stop the item_queue_handlers_cb() timeout from executing, but it is often restarted early as there are many triggers for item_queue_handlers_set_up(), and the callback then pops the file again, discovers it is still blocked and reenqueues it. This is a waste of time and also makes it hard to implement infinite loop detection reliably. We now remember the file that is blocking progress, and item_queue_handlers_set_up() will not start the item queue handlers until sparql_buffer_task_finished_cb() has been called for the file in question.
-rw-r--r--src/libtracker-miner/tracker-miner-fs.c53
1 files changed, 51 insertions, 2 deletions
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 950b1b841..fb44d5c69 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -165,6 +165,7 @@ struct _TrackerMinerFSPrivate {
GTimer *extraction_timer;
guint item_queues_handler_id;
+ GFile *item_queue_blocker;
gdouble throttle;
@@ -650,6 +651,10 @@ fs_finalize (GObject *object)
if (priv->item_queues_handler_id) {
g_source_remove (priv->item_queues_handler_id);
priv->item_queues_handler_id = 0;
+
+ if (priv->item_queue_blocker) {
+ g_object_unref (priv->item_queue_blocker);
+ }
}
tracker_file_notifier_stop (priv->file_notifier);
@@ -986,6 +991,21 @@ item_writeback_data_free (ItemWritebackData *data)
g_slice_free (ItemWritebackData, data);
}
+static gboolean
+item_queue_is_blocked_by_file (TrackerMinerFS *fs,
+ GFile *file)
+{
+ g_return_val_if_fail (G_IS_FILE (file), FALSE);
+
+ if (fs->priv->item_queue_blocker != NULL &&
+ (fs->priv->item_queue_blocker == file ||
+ g_file_equal (fs->priv->item_queue_blocker, file))) {
+ return TRUE;
+ }
+
+ return FALSE;
+}
+
static void
sparql_buffer_task_finished_cb (GObject *object,
GAsyncResult *result,
@@ -993,6 +1013,8 @@ sparql_buffer_task_finished_cb (GObject *object,
{
TrackerMinerFS *fs;
TrackerMinerFSPrivate *priv;
+ TrackerTask *task;
+ GFile *task_file;
GError *error = NULL;
fs = user_data;
@@ -1005,7 +1027,22 @@ sparql_buffer_task_finished_cb (GObject *object,
g_error_free (error);
}
- item_queue_handlers_set_up (fs);
+ task = g_simple_async_result_get_op_res_gpointer (G_SIMPLE_ASYNC_RESULT (result));
+ task_file = tracker_task_get_file (task);
+
+ if (item_queue_is_blocked_by_file (fs, task_file)) {
+ g_object_unref (priv->item_queue_blocker);
+ priv->item_queue_blocker = NULL;
+ }
+
+ if (priv->item_queue_blocker != NULL) {
+ if (tracker_task_pool_get_size (TRACKER_TASK_POOL (object)) > 0) {
+ tracker_sparql_buffer_flush (TRACKER_SPARQL_BUFFER (object),
+ "Item queue still blocked after flush");
+ }
+ } else {
+ item_queue_handlers_set_up (fs);
+ }
}
static UpdateProcessingTaskContext *
@@ -1199,6 +1236,10 @@ item_add_or_update_cb (TrackerMinerFS *fs,
ctxt->priority,
sparql_buffer_task_finished_cb,
fs);
+
+ if (item_queue_is_blocked_by_file (fs, task_file)) {
+ tracker_sparql_buffer_flush (fs->priv->sparql_buffer, "Current file is blocking item queue");
+ }
}
if (!tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
@@ -1708,6 +1749,7 @@ should_wait (TrackerMinerFS *fs,
tracker_task_pool_find (TRACKER_TASK_POOL (fs->priv->sparql_buffer), file)) {
/* Yes, a previous event on same item currently
* being processed */
+ fs->priv->item_queue_blocker = g_object_ref (file);
return TRUE;
}
@@ -1718,7 +1760,7 @@ should_wait (TrackerMinerFS *fs,
tracker_task_pool_find (TRACKER_TASK_POOL (fs->priv->sparql_buffer), parent)) {
/* Yes, a previous event on the parent of this item
* currently being processed */
- g_object_unref (parent);
+ fs->priv->item_queue_blocker = parent;
return TRUE;
}
@@ -2033,6 +2075,7 @@ item_queue_handlers_cb (gpointer user_data)
* on with the queues... */
tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
"Queue handlers WAIT");
+
return FALSE;
}
@@ -2267,6 +2310,12 @@ item_queue_handlers_set_up (TrackerMinerFS *fs)
return;
}
+ if (fs->priv->item_queue_blocker) {
+ trace_eq (" cancelled: item queue blocked waiting for file '%s'",
+ g_file_get_path (fs->priv->item_queue_blocker));
+ return;
+ }
+
/* Already sent max number of tasks to tracker-extract/writeback? */
if (tracker_task_pool_limit_reached (fs->priv->task_pool) ||
tracker_task_pool_limit_reached (fs->priv->writeback_pool)) {