summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Playfair Cal <daniel.playfair.cal@gmail.com>2018-11-16 23:42:08 +0000
committerDaniel Playfair Cal <daniel.playfair.cal@gmail.com>2018-11-16 23:42:08 +0000
commit0b440a2e8689dcd7bdf70b69aff6125751b936a0 (patch)
treebafb0fc9bfd0af472a61b63eb73ad466919d5b48
parent7a033fa8b762c2a247e32317ffd436dd6403c942 (diff)
parent15f383599c606c95413c0a4ed382c186b1f42ac3 (diff)
downloaddconf-0b440a2e8689dcd7bdf70b69aff6125751b936a0.tar.gz
Merge branch 'coalesce-writes' into 'master'
Coalesce pending writes into a single changeset See merge request GNOME/dconf!30
-rw-r--r--engine/dconf-engine.c206
-rw-r--r--tests/client.c114
2 files changed, 157 insertions, 163 deletions
diff --git a/engine/dconf-engine.c b/engine/dconf-engine.c
index 06d7dd6..18b8aa5 100644
--- a/engine/dconf-engine.c
+++ b/engine/dconf-engine.c
@@ -75,36 +75,15 @@
* changed but then quickly changed back again by some external
* agent.
*
- * In fast mode we have to do some management of the queue. If we
- * immediately put all requests "in flight" then we can end up in a
- * situation where the application writes many values for the same key
- * and the service is kept (needlessly) busy writing over and over to
- * the same key for some time after the requests stop coming in.
+ * In fast mode if we were to immediately put all requests "in flight",
+ * then we could end up in a situation where the service is kept
+ * (needlessly) busy rewriting the database over and over again after a
+ * sequence of fast changes on the client side.
*
- * If we limit the number of in-flight requests and put the other ones
- * into a pending queue then we can perform merging of similar changes.
- * If we notice that an item in the pending queue writes to the same
- * keys as the newly-added request then we can simply drop the existing
- * request (since its effect will be nullified by the new request).
- *
- * We want to keep the number of in-flight requests low in order to
- * maximise our chance of dropping pending items, but we probably want
- * it higher than 1 so that we can pipeline to hide latency.
- *
- * In order to minimise complexity, all changes go first to the pending
- * queue. Changes are dispatched from the pending queue (and moved to
- * the in-flight queue) when the number of requests in-flight is lower
- * than the maximum.
- *
- * For both 'in_flight' and 'pending' queues we push to the tail and pop
- * from the head. This puts the first operation on the head and the
- * most recent operation on the tail.
- *
- * Since new operation go first to the pending queue, we find the most
- * recent operations at the tail of that queue. Since we want to return
- * the most-recently written value, we therefore scan for values
- * starting at the tail of the pending queue and ending at the head of
- * the in-flight queue.
+ * To avoid the issue we limit the number of in-flight requests to one.
+ * If a request is already in flight, subsequent changes are merged into
+ * a single aggregated pending change to be submitted as the next write
+ * after the in-flight request completes.
*
* NB: I tell a lie. Async is not supported yet.
*
@@ -140,8 +119,9 @@
* 'sources' array itself (and 'n_sources') are set at construction and
* never change after that.
*
- * The second lock (queue_lock) protects the various queues that are
- * used to implement the "fast" writes described above.
+ * The second lock (queue_lock) protects the queue (represented with two
+ * fields pending and in_flight) used to implement the "fast" writes
+ * described above.
*
* The third lock (subscription_count_lock) protects the two hash tables
* that are used to keep track of the number of subscriptions held by
@@ -154,8 +134,6 @@
* sources_lock or queue_lock
*/
-#define MAX_IN_FLIGHT 2
-
static GSList *dconf_engine_global_list;
static GMutex dconf_engine_global_lock;
@@ -171,9 +149,9 @@ struct _DConfEngine
gint n_sources;
GMutex queue_lock; /* This lock is for pending, in_flight, queue_cond */
- GCond queue_cond; /* Signalled when the queues empty */
- GQueue pending; /* DConfChangeset */
- GQueue in_flight; /* DConfChangeset */
+ GCond queue_cond; /* Signalled when there are neither in-flight nor pending changes. */
+ DConfChangeset *pending; /* Yet to be sent on the wire. */
+ DConfChangeset *in_flight; /* Already sent but awaiting response. */
gchar *last_handled; /* reply tag from last item in in_flight */
@@ -229,13 +207,13 @@ dconf_engine_release_sources (DConfEngine *engine)
}
static void
-dconf_engine_lock_queues (DConfEngine *engine)
+dconf_engine_lock_queue (DConfEngine *engine)
{
g_mutex_lock (&engine->queue_lock);
}
static void
-dconf_engine_unlock_queues (DConfEngine *engine)
+dconf_engine_unlock_queue (DConfEngine *engine)
{
g_mutex_unlock (&engine->queue_lock);
}
@@ -402,11 +380,8 @@ dconf_engine_unref (DConfEngine *engine)
g_free (engine->last_handled);
- while (!g_queue_is_empty (&engine->pending))
- dconf_changeset_unref ((DConfChangeset *) g_queue_pop_head (&engine->pending));
-
- while (!g_queue_is_empty (&engine->in_flight))
- dconf_changeset_unref ((DConfChangeset *) g_queue_pop_head (&engine->in_flight));
+ g_clear_pointer (&engine->pending, dconf_changeset_unref);
+ g_clear_pointer (&engine->in_flight, dconf_changeset_unref);
for (i = 0; i < engine->n_sources; i++)
dconf_engine_source_free (engine->sources[i]);
@@ -730,15 +705,18 @@ dconf_engine_read (DConfEngine *engine,
*/
if (!found_key)
{
- dconf_engine_lock_queues (engine);
+ dconf_engine_lock_queue (engine);
- /* Check the pending queue first because those were submitted
+ /* Check the pending first because those were submitted
* more recently.
*/
- found_key = dconf_engine_find_key_in_queue (&engine->pending, key, &value) ||
- dconf_engine_find_key_in_queue (&engine->in_flight, key, &value);
+ if (engine->pending != NULL)
+ found_key = dconf_changeset_get (engine->pending, key, &value);
+
+ if (!found_key && engine->in_flight != NULL)
+ found_key = dconf_changeset_get (engine->in_flight, key, &value);
- dconf_engine_unlock_queues (engine);
+ dconf_engine_unlock_queue (engine);
}
/* Step 4. Check the first source. */
@@ -1137,19 +1115,17 @@ dconf_engine_prepare_change (DConfEngine *engine,
(GDestroyNotify) g_variant_unref, g_variant_ref_sink (serialised));
}
-/* This function promotes changes from the pending queue to the
- * in-flight queue by sending the appropriate D-Bus message.
+/* This function promotes the pending changeset to become the in-flight
+ * changeset by sending the appropriate D-Bus message.
*
- * Of course, this is only possible when there are pending items and
- * room in the in-flight queue. For this reason, this function gets
- * called in two situations:
+ * Of course, this is only possible when there is a pending changeset
+ * and no changeset is in-flight already. For this reason, this function
+ * gets called in two situations:
*
- * - an item has been added to the pending queue (due to an API call)
+ * - when there is a new pending changeset (due to an API call)
*
- * - an item has been removed from the inflight queue (due to a D-Bus
+ * - when in-flight changeset had been delivered (due to a D-Bus
* reply having been received)
- *
- * It will move a maximum of one item.
*/
static void dconf_engine_manage_queue (DConfEngine *engine);
@@ -1172,42 +1148,16 @@ dconf_engine_change_completed (DConfEngine *engine,
const GError *error)
{
OutstandingChange *oc = handle;
+ DConfChangeset *expected;
- dconf_engine_lock_queues (engine);
-
- /* D-Bus guarantees ordered delivery of messages.
- *
- * The dconf-service handles requests in-order.
- *
- * The reply we just received should therefore be at the head of
- * our 'in flight' queue.
- *
- * Due to https://bugs.freedesktop.org/show_bug.cgi?id=59780 it is
- * possible that we receive an out-of-sequence error message, however,
- * so only assume that messages are in-order for positive replies.
- */
- if (reply)
- {
- DConfChangeset *expected;
-
- expected = g_queue_pop_head (&engine->in_flight);
- g_assert (expected && oc->change == expected);
- }
- else
- {
- gboolean found;
-
- g_assert (error != NULL);
+ dconf_engine_lock_queue (engine);
- found = g_queue_remove (&engine->in_flight, oc->change);
- g_assert (found);
- }
+ expected = g_steal_pointer (&engine->in_flight);
+ g_assert (expected && oc->change == expected);
- /* We just popped a change from the in-flight queue, possibly
- * making room for another to be added. Check that.
- */
+ /* Another request could be sent now. Check for pending changes. */
dconf_engine_manage_queue (engine);
- dconf_engine_unlock_queues (engine);
+ dconf_engine_unlock_queue (engine);
/* Deal with the reply we got. */
if (reply)
@@ -1249,7 +1199,7 @@ dconf_engine_change_completed (DConfEngine *engine,
static void
dconf_engine_manage_queue (DConfEngine *engine)
{
- if (!g_queue_is_empty (&engine->pending) && g_queue_get_length (&engine->in_flight) < MAX_IN_FLIGHT)
+ if (engine->pending != NULL && engine->in_flight == NULL)
{
OutstandingChange *oc;
GVariant *parameters;
@@ -1257,7 +1207,7 @@ dconf_engine_manage_queue (DConfEngine *engine)
oc = dconf_engine_call_handle_new (engine, dconf_engine_change_completed,
G_VARIANT_TYPE ("(s)"), sizeof (OutstandingChange));
- oc->change = g_queue_pop_head (&engine->pending);
+ oc->change = engine->in_flight = g_steal_pointer (&engine->pending);
parameters = dconf_engine_prepare_change (engine, oc->change);
@@ -1266,16 +1216,14 @@ dconf_engine_manage_queue (DConfEngine *engine)
engine->sources[0]->object_path,
"ca.desrt.dconf.Writer", "Change",
parameters, &oc->handle, NULL);
-
- g_queue_push_tail (&engine->in_flight, oc->change);
}
- if (g_queue_is_empty (&engine->in_flight))
+ if (engine->in_flight == NULL)
{
/* The in-flight queue should not be empty if we have changes
* pending...
*/
- g_assert (g_queue_is_empty (&engine->pending));
+ g_assert (engine->pending == NULL);
g_cond_broadcast (&engine->queue_cond);
}
@@ -1321,7 +1269,6 @@ dconf_engine_change_fast (DConfEngine *engine,
gpointer origin_tag,
GError **error)
{
- GList *node;
g_debug ("change_fast");
if (dconf_changeset_is_empty (changeset))
return TRUE;
@@ -1331,50 +1278,23 @@ dconf_engine_change_fast (DConfEngine *engine,
dconf_changeset_seal (changeset);
- /* Check for duplicates in the pending queue.
- *
- * Note: order doesn't really matter here since "similarity" is an
- * equivalence class and we've ensured that there are no pairwise
- * similar changes in the queue already (ie: at most we will have only
- * one similar item to the one we are adding).
- */
- dconf_engine_lock_queues (engine);
+ dconf_engine_lock_queue (engine);
- for (node = g_queue_peek_head_link (&engine->pending); node; node = node->next)
- {
- DConfChangeset *queued_change = node->data;
+ /* The pending changeset is kept unsealed so that it can be modified
+ * by later calls to this functions. It wouldn't be a good idea to
+ * repurpose the incoming changeset for this role, so create a new
+ * one if necessary. */
+ if (engine->pending == NULL)
+ engine->pending = dconf_changeset_new ();
- if (dconf_changeset_is_similar_to (changeset, queued_change))
- {
- /* We found a similar item in the queue.
- *
- * We want to drop the one that's in the queue already since
- * we want our new (more recent) change to take precedence.
- *
- * The pending queue owned the changeset, so free it.
- */
- g_queue_delete_link (&engine->pending, node);
- dconf_changeset_unref (queued_change);
+ dconf_changeset_change (engine->pending, changeset);
- /* There will only have been one, so stop looking. */
- break;
- }
- }
-
- /* No matter what we're going to queue up this change, so put it in
- * the pending queue now.
- *
- * There may be room in the in_flight queue, so we try to manage the
- * queue right away in order to try to promote it there (which causes
- * the D-Bus message to actually be sent).
- *
- * The change might get tossed before being sent if the loop above
- * finds it on a future call.
- */
- g_queue_push_tail (&engine->pending, dconf_changeset_ref (changeset));
+ /* There might be no in-flight request yet, so we try to manage the
+ * queue right away in order to try to promote pending changes there
+ * (which causes the D-Bus message to actually be sent). */
dconf_engine_manage_queue (engine);
- dconf_engine_unlock_queues (engine);
+ dconf_engine_unlock_queue (engine);
/* Emit the signal after dropping the lock to avoid deadlock on re-entry. */
dconf_engine_emit_changes (engine, changeset, origin_tag);
@@ -1500,7 +1420,7 @@ dconf_engine_handle_dbus_signal (GBusType type,
/* It's possible that this incoming change notify is for a
* change that we already announced to the client when we
- * placed it in the pending queue.
+ * placed it in the queue.
*
* Check last_handled to determine if we should ignore it.
*/
@@ -1555,12 +1475,12 @@ dconf_engine_has_outstanding (DConfEngine *engine)
{
gboolean has;
- /* The in-flight queue will never be empty unless the pending queue is
+ /* The in-flight will never be empty unless the pending is
* also empty, so we only really need to check one of them...
*/
- dconf_engine_lock_queues (engine);
- has = !g_queue_is_empty (&engine->in_flight);
- dconf_engine_unlock_queues (engine);
+ dconf_engine_lock_queue (engine);
+ has = engine->in_flight != NULL;
+ dconf_engine_unlock_queue (engine);
return has;
}
@@ -1569,8 +1489,8 @@ void
dconf_engine_sync (DConfEngine *engine)
{
g_debug ("sync");
- dconf_engine_lock_queues (engine);
- while (!g_queue_is_empty (&engine->in_flight))
+ dconf_engine_lock_queue (engine);
+ while (engine->in_flight != NULL)
g_cond_wait (&engine->queue_cond, &engine->queue_lock);
- dconf_engine_unlock_queues (engine);
+ dconf_engine_unlock_queue (engine);
}
diff --git a/tests/client.c b/tests/client.c
index 4727e0c..1773ed1 100644
--- a/tests/client.c
+++ b/tests/client.c
@@ -58,7 +58,7 @@ queue_up_100_writes (DConfClient *client)
gint i;
/* We send 100 writes, letting them pile up.
- * At no time should there be more than 2 writes on the wire.
+ * At no time should there be more than one write on the wire.
*/
for (i = 0; i < 100; i++)
{
@@ -71,7 +71,7 @@ queue_up_100_writes (DConfClient *client)
check_and_free (dconf_client_read_full (client, "/test/value", DCONF_READ_DEFAULT_VALUE, NULL), NULL);
}
- g_assert_cmpint (g_queue_get_length (&dconf_mock_dbus_outstanding_call_handles), ==, 2);
+ g_assert_cmpint (g_queue_get_length (&dconf_mock_dbus_outstanding_call_handles), ==, 1);
}
static void
@@ -108,7 +108,6 @@ static void
test_fast (void)
{
DConfClient *client;
- gint i;
g_log_set_writer_func (log_writer_cb, NULL, NULL);
@@ -119,30 +118,23 @@ test_fast (void)
/* Start indicating that the writes failed.
*
- * For the first failures, we should continue to see the most recently
- * written value (99).
- *
- * After we fail that last one, we should see NULL returned.
+ * Because of the pending-merge logic, we should only have had to fail two calls.
*
* Each time, we should see a change notify.
*/
- for (i = 0; g_queue_get_length (&dconf_mock_dbus_outstanding_call_handles) > 1; i++)
- {
- changed_was_called = FALSE;
- fail_one_call ();
- g_assert (changed_was_called);
+ g_assert_cmpint (g_queue_get_length (&dconf_mock_dbus_outstanding_call_handles), == , 1);
- check_and_free (dconf_client_read (client, "/test/value"), g_variant_new_int32 (99));
- check_and_free (dconf_client_read_full (client, "/test/value", DCONF_READ_DEFAULT_VALUE, NULL), NULL);
- }
+ changed_was_called = FALSE;
+ fail_one_call ();
+ g_assert (changed_was_called);
- /* Because of the pending-merging logic, we should only have had to
- * fail two calls.
- */
- g_assert (i == 2);
+ /* For the first failure, we should continue to see the most recently written value (99) */
+ check_and_free (dconf_client_read (client, "/test/value"), g_variant_new_int32 (99));
+ check_and_free (dconf_client_read_full (client, "/test/value", DCONF_READ_DEFAULT_VALUE, NULL), NULL);
+
+ g_assert_cmpint (g_queue_get_length (&dconf_mock_dbus_outstanding_call_handles), == , 1);
- /* Fail the last call. */
changed_was_called = FALSE;
fail_one_call ();
g_assert (changed_was_called);
@@ -151,11 +143,92 @@ test_fast (void)
check_and_free (dconf_client_read (client, "/test/value"), NULL);
check_and_free (dconf_client_read_full (client, "/test/value", DCONF_READ_DEFAULT_VALUE, NULL), NULL);
+ g_assert_cmpint (g_queue_get_length (&dconf_mock_dbus_outstanding_call_handles), == , 0);
+
/* Cleanup */
g_signal_handlers_disconnect_by_func (client, changed, NULL);
g_object_unref (client);
}
+static gboolean changed_a, changed_b, changed_c;
+
+static void
+coalesce_changed (DConfClient *client,
+ const gchar *prefix,
+ const gchar * const *changes,
+ const gchar *tag,
+ gpointer user_data)
+{
+ changed_a = g_str_equal (prefix, "/test/a") || g_strv_contains (changes, "a");
+ changed_b = g_str_equal (prefix, "/test/b") || g_strv_contains (changes, "b");
+ changed_c = g_str_equal (prefix, "/test/c") || g_strv_contains (changes, "c");
+}
+
+static void
+test_coalesce (void)
+{
+ gint i, a, b, c;
+ gboolean should_change_a, should_change_b, should_change_c;
+ g_autoptr(DConfClient) client = NULL;
+
+ gint changes[][3] = {
+ {1, 0, 0},
+ {1, 1, 1},
+ {0, 1, 1},
+ {0, 0, 1},
+ {0, 0, 0},
+ {1, 0, 0},
+ {1, 0, 0},
+ };
+
+ client = dconf_client_new ();
+ g_signal_connect (client, "changed", G_CALLBACK (coalesce_changed), NULL);
+
+ a = b = c = 0;
+
+ for (i = 0; i != G_N_ELEMENTS (changes); ++i)
+ {
+ g_autoptr(DConfChangeset) changeset = NULL;
+
+ should_change_a = changes[i][0];
+ should_change_b = changes[i][1];
+ should_change_c = changes[i][2];
+
+ changeset = dconf_changeset_new ();
+
+ if (should_change_a)
+ dconf_changeset_set (changeset, "/test/a", g_variant_new_int32 (++a));
+ if (should_change_b)
+ dconf_changeset_set (changeset, "/test/b", g_variant_new_int32 (++b));
+ if (should_change_c)
+ dconf_changeset_set (changeset, "/test/c", g_variant_new_int32 (++c));
+
+ changed_a = changed_b = changed_c = FALSE;
+
+ g_assert_true (dconf_client_change_fast (client, changeset, NULL));
+
+ /* Notifications should be only about keys we have just written. */
+ g_assert_cmpint (should_change_a, ==, changed_a);
+ g_assert_cmpint (should_change_b, ==, changed_b);
+ g_assert_cmpint (should_change_c, ==, changed_c);
+
+ /* We should see value from the most recent write or NULL if we haven't written it yet. */
+ check_and_free (dconf_client_read (client, "/test/a"), a == 0 ? NULL : g_variant_new_int32 (a));
+ check_and_free (dconf_client_read (client, "/test/b"), b == 0 ? NULL : g_variant_new_int32 (b));
+ check_and_free (dconf_client_read (client, "/test/c"), c == 0 ? NULL : g_variant_new_int32 (c));
+ }
+
+ dconf_mock_dbus_async_reply (g_variant_new ("(s)", "1"), NULL);
+ dconf_mock_dbus_async_reply (g_variant_new ("(s)", "2"), NULL);
+
+ /* There should be no more requests since all but first have been
+ * coalesced together. */
+ dconf_mock_dbus_assert_no_async ();
+
+ /* Cleanup */
+ g_signal_handlers_disconnect_by_func (client, changed, NULL);
+}
+
int
main (int argc, char **argv)
{
@@ -167,6 +240,7 @@ main (int argc, char **argv)
g_test_add_func ("/client/lifecycle", test_lifecycle);
g_test_add_func ("/client/basic-fast", test_fast);
+ g_test_add_func ("/client/coalesce", test_coalesce);
return g_test_run ();
}