diff options
author | Daniel Playfair Cal <daniel.playfair.cal@gmail.com> | 2018-11-16 23:42:08 +0000 |
---|---|---|
committer | Daniel Playfair Cal <daniel.playfair.cal@gmail.com> | 2018-11-16 23:42:08 +0000 |
commit | 0b440a2e8689dcd7bdf70b69aff6125751b936a0 (patch) | |
tree | bafb0fc9bfd0af472a61b63eb73ad466919d5b48 | |
parent | 7a033fa8b762c2a247e32317ffd436dd6403c942 (diff) | |
parent | 15f383599c606c95413c0a4ed382c186b1f42ac3 (diff) | |
download | dconf-0b440a2e8689dcd7bdf70b69aff6125751b936a0.tar.gz |
Merge branch 'coalesce-writes' into 'master'
Coalesce pending writes into a single changeset
See merge request GNOME/dconf!30
-rw-r--r-- | engine/dconf-engine.c | 206 | ||||
-rw-r--r-- | tests/client.c | 114 |
2 files changed, 157 insertions, 163 deletions
diff --git a/engine/dconf-engine.c b/engine/dconf-engine.c index 06d7dd6..18b8aa5 100644 --- a/engine/dconf-engine.c +++ b/engine/dconf-engine.c @@ -75,36 +75,15 @@ * changed but then quickly changed back again by some external * agent. * - * In fast mode we have to do some management of the queue. If we - * immediately put all requests "in flight" then we can end up in a - * situation where the application writes many values for the same key - * and the service is kept (needlessly) busy writing over and over to - * the same key for some time after the requests stop coming in. + * In fast mode if we were to immediately put all requests "in flight", + * then we could end up in a situation where the service is kept + * (needlessly) busy rewriting the database over and over again after a + * sequence of fast changes on the client side. * - * If we limit the number of in-flight requests and put the other ones - * into a pending queue then we can perform merging of similar changes. - * If we notice that an item in the pending queue writes to the same - * keys as the newly-added request then we can simply drop the existing - * request (since its effect will be nullified by the new request). - * - * We want to keep the number of in-flight requests low in order to - * maximise our chance of dropping pending items, but we probably want - * it higher than 1 so that we can pipeline to hide latency. - * - * In order to minimise complexity, all changes go first to the pending - * queue. Changes are dispatched from the pending queue (and moved to - * the in-flight queue) when the number of requests in-flight is lower - * than the maximum. - * - * For both 'in_flight' and 'pending' queues we push to the tail and pop - * from the head. This puts the first operation on the head and the - * most recent operation on the tail. - * - * Since new operation go first to the pending queue, we find the most - * recent operations at the tail of that queue. Since we want to return - * the most-recently written value, we therefore scan for values - * starting at the tail of the pending queue and ending at the head of - * the in-flight queue. + * To avoid the issue we limit the number of in-flight requests to one. + * If a request is already in flight, subsequent changes are merged into + * a single aggregated pending change to be submitted as the next write + * after the in-flight request completes. * * NB: I tell a lie. Async is not supported yet. * @@ -140,8 +119,9 @@ * 'sources' array itself (and 'n_sources') are set at construction and * never change after that. * - * The second lock (queue_lock) protects the various queues that are - * used to implement the "fast" writes described above. + * The second lock (queue_lock) protects the queue (represented with two + * fields pending and in_flight) used to implement the "fast" writes + * described above. * * The third lock (subscription_count_lock) protects the two hash tables * that are used to keep track of the number of subscriptions held by @@ -154,8 +134,6 @@ * sources_lock or queue_lock */ -#define MAX_IN_FLIGHT 2 - static GSList *dconf_engine_global_list; static GMutex dconf_engine_global_lock; @@ -171,9 +149,9 @@ struct _DConfEngine gint n_sources; GMutex queue_lock; /* This lock is for pending, in_flight, queue_cond */ - GCond queue_cond; /* Signalled when the queues empty */ - GQueue pending; /* DConfChangeset */ - GQueue in_flight; /* DConfChangeset */ + GCond queue_cond; /* Signalled when there are neither in-flight nor pending changes. */ + DConfChangeset *pending; /* Yet to be sent on the wire. */ + DConfChangeset *in_flight; /* Already sent but awaiting response. */ gchar *last_handled; /* reply tag from last item in in_flight */ @@ -229,13 +207,13 @@ dconf_engine_release_sources (DConfEngine *engine) } static void -dconf_engine_lock_queues (DConfEngine *engine) +dconf_engine_lock_queue (DConfEngine *engine) { g_mutex_lock (&engine->queue_lock); } static void -dconf_engine_unlock_queues (DConfEngine *engine) +dconf_engine_unlock_queue (DConfEngine *engine) { g_mutex_unlock (&engine->queue_lock); } @@ -402,11 +380,8 @@ dconf_engine_unref (DConfEngine *engine) g_free (engine->last_handled); - while (!g_queue_is_empty (&engine->pending)) - dconf_changeset_unref ((DConfChangeset *) g_queue_pop_head (&engine->pending)); - - while (!g_queue_is_empty (&engine->in_flight)) - dconf_changeset_unref ((DConfChangeset *) g_queue_pop_head (&engine->in_flight)); + g_clear_pointer (&engine->pending, dconf_changeset_unref); + g_clear_pointer (&engine->in_flight, dconf_changeset_unref); for (i = 0; i < engine->n_sources; i++) dconf_engine_source_free (engine->sources[i]); @@ -730,15 +705,18 @@ dconf_engine_read (DConfEngine *engine, */ if (!found_key) { - dconf_engine_lock_queues (engine); + dconf_engine_lock_queue (engine); - /* Check the pending queue first because those were submitted + /* Check the pending first because those were submitted * more recently. */ - found_key = dconf_engine_find_key_in_queue (&engine->pending, key, &value) || - dconf_engine_find_key_in_queue (&engine->in_flight, key, &value); + if (engine->pending != NULL) + found_key = dconf_changeset_get (engine->pending, key, &value); + + if (!found_key && engine->in_flight != NULL) + found_key = dconf_changeset_get (engine->in_flight, key, &value); - dconf_engine_unlock_queues (engine); + dconf_engine_unlock_queue (engine); } /* Step 4. Check the first source. */ @@ -1137,19 +1115,17 @@ dconf_engine_prepare_change (DConfEngine *engine, (GDestroyNotify) g_variant_unref, g_variant_ref_sink (serialised)); } -/* This function promotes changes from the pending queue to the - * in-flight queue by sending the appropriate D-Bus message. +/* This function promotes the pending changeset to become the in-flight + * changeset by sending the appropriate D-Bus message. * - * Of course, this is only possible when there are pending items and - * room in the in-flight queue. For this reason, this function gets - * called in two situations: + * Of course, this is only possible when there is a pending changeset + * and no changeset is in-flight already. For this reason, this function + * gets called in two situations: * - * - an item has been added to the pending queue (due to an API call) + * - when there is a new pending changeset (due to an API call) * - * - an item has been removed from the inflight queue (due to a D-Bus + * - when in-flight changeset had been delivered (due to a D-Bus * reply having been received) - * - * It will move a maximum of one item. */ static void dconf_engine_manage_queue (DConfEngine *engine); @@ -1172,42 +1148,16 @@ dconf_engine_change_completed (DConfEngine *engine, const GError *error) { OutstandingChange *oc = handle; + DConfChangeset *expected; - dconf_engine_lock_queues (engine); - - /* D-Bus guarantees ordered delivery of messages. - * - * The dconf-service handles requests in-order. - * - * The reply we just received should therefore be at the head of - * our 'in flight' queue. - * - * Due to https://bugs.freedesktop.org/show_bug.cgi?id=59780 it is - * possible that we receive an out-of-sequence error message, however, - * so only assume that messages are in-order for positive replies. - */ - if (reply) - { - DConfChangeset *expected; - - expected = g_queue_pop_head (&engine->in_flight); - g_assert (expected && oc->change == expected); - } - else - { - gboolean found; - - g_assert (error != NULL); + dconf_engine_lock_queue (engine); - found = g_queue_remove (&engine->in_flight, oc->change); - g_assert (found); - } + expected = g_steal_pointer (&engine->in_flight); + g_assert (expected && oc->change == expected); - /* We just popped a change from the in-flight queue, possibly - * making room for another to be added. Check that. - */ + /* Another request could be sent now. Check for pending changes. */ dconf_engine_manage_queue (engine); - dconf_engine_unlock_queues (engine); + dconf_engine_unlock_queue (engine); /* Deal with the reply we got. */ if (reply) @@ -1249,7 +1199,7 @@ dconf_engine_change_completed (DConfEngine *engine, static void dconf_engine_manage_queue (DConfEngine *engine) { - if (!g_queue_is_empty (&engine->pending) && g_queue_get_length (&engine->in_flight) < MAX_IN_FLIGHT) + if (engine->pending != NULL && engine->in_flight == NULL) { OutstandingChange *oc; GVariant *parameters; @@ -1257,7 +1207,7 @@ dconf_engine_manage_queue (DConfEngine *engine) oc = dconf_engine_call_handle_new (engine, dconf_engine_change_completed, G_VARIANT_TYPE ("(s)"), sizeof (OutstandingChange)); - oc->change = g_queue_pop_head (&engine->pending); + oc->change = engine->in_flight = g_steal_pointer (&engine->pending); parameters = dconf_engine_prepare_change (engine, oc->change); @@ -1266,16 +1216,14 @@ dconf_engine_manage_queue (DConfEngine *engine) engine->sources[0]->object_path, "ca.desrt.dconf.Writer", "Change", parameters, &oc->handle, NULL); - - g_queue_push_tail (&engine->in_flight, oc->change); } - if (g_queue_is_empty (&engine->in_flight)) + if (engine->in_flight == NULL) { /* The in-flight queue should not be empty if we have changes * pending... */ - g_assert (g_queue_is_empty (&engine->pending)); + g_assert (engine->pending == NULL); g_cond_broadcast (&engine->queue_cond); } @@ -1321,7 +1269,6 @@ dconf_engine_change_fast (DConfEngine *engine, gpointer origin_tag, GError **error) { - GList *node; g_debug ("change_fast"); if (dconf_changeset_is_empty (changeset)) return TRUE; @@ -1331,50 +1278,23 @@ dconf_engine_change_fast (DConfEngine *engine, dconf_changeset_seal (changeset); - /* Check for duplicates in the pending queue. - * - * Note: order doesn't really matter here since "similarity" is an - * equivalence class and we've ensured that there are no pairwise - * similar changes in the queue already (ie: at most we will have only - * one similar item to the one we are adding). - */ - dconf_engine_lock_queues (engine); + dconf_engine_lock_queue (engine); - for (node = g_queue_peek_head_link (&engine->pending); node; node = node->next) - { - DConfChangeset *queued_change = node->data; + /* The pending changeset is kept unsealed so that it can be modified + * by later calls to this functions. It wouldn't be a good idea to + * repurpose the incoming changeset for this role, so create a new + * one if necessary. */ + if (engine->pending == NULL) + engine->pending = dconf_changeset_new (); - if (dconf_changeset_is_similar_to (changeset, queued_change)) - { - /* We found a similar item in the queue. - * - * We want to drop the one that's in the queue already since - * we want our new (more recent) change to take precedence. - * - * The pending queue owned the changeset, so free it. - */ - g_queue_delete_link (&engine->pending, node); - dconf_changeset_unref (queued_change); + dconf_changeset_change (engine->pending, changeset); - /* There will only have been one, so stop looking. */ - break; - } - } - - /* No matter what we're going to queue up this change, so put it in - * the pending queue now. - * - * There may be room in the in_flight queue, so we try to manage the - * queue right away in order to try to promote it there (which causes - * the D-Bus message to actually be sent). - * - * The change might get tossed before being sent if the loop above - * finds it on a future call. - */ - g_queue_push_tail (&engine->pending, dconf_changeset_ref (changeset)); + /* There might be no in-flight request yet, so we try to manage the + * queue right away in order to try to promote pending changes there + * (which causes the D-Bus message to actually be sent). */ dconf_engine_manage_queue (engine); - dconf_engine_unlock_queues (engine); + dconf_engine_unlock_queue (engine); /* Emit the signal after dropping the lock to avoid deadlock on re-entry. */ dconf_engine_emit_changes (engine, changeset, origin_tag); @@ -1500,7 +1420,7 @@ dconf_engine_handle_dbus_signal (GBusType type, /* It's possible that this incoming change notify is for a * change that we already announced to the client when we - * placed it in the pending queue. + * placed it in the queue. * * Check last_handled to determine if we should ignore it. */ @@ -1555,12 +1475,12 @@ dconf_engine_has_outstanding (DConfEngine *engine) { gboolean has; - /* The in-flight queue will never be empty unless the pending queue is + /* The in-flight will never be empty unless the pending is * also empty, so we only really need to check one of them... */ - dconf_engine_lock_queues (engine); - has = !g_queue_is_empty (&engine->in_flight); - dconf_engine_unlock_queues (engine); + dconf_engine_lock_queue (engine); + has = engine->in_flight != NULL; + dconf_engine_unlock_queue (engine); return has; } @@ -1569,8 +1489,8 @@ void dconf_engine_sync (DConfEngine *engine) { g_debug ("sync"); - dconf_engine_lock_queues (engine); - while (!g_queue_is_empty (&engine->in_flight)) + dconf_engine_lock_queue (engine); + while (engine->in_flight != NULL) g_cond_wait (&engine->queue_cond, &engine->queue_lock); - dconf_engine_unlock_queues (engine); + dconf_engine_unlock_queue (engine); } diff --git a/tests/client.c b/tests/client.c index 4727e0c..1773ed1 100644 --- a/tests/client.c +++ b/tests/client.c @@ -58,7 +58,7 @@ queue_up_100_writes (DConfClient *client) gint i; /* We send 100 writes, letting them pile up. - * At no time should there be more than 2 writes on the wire. + * At no time should there be more than one write on the wire. */ for (i = 0; i < 100; i++) { @@ -71,7 +71,7 @@ queue_up_100_writes (DConfClient *client) check_and_free (dconf_client_read_full (client, "/test/value", DCONF_READ_DEFAULT_VALUE, NULL), NULL); } - g_assert_cmpint (g_queue_get_length (&dconf_mock_dbus_outstanding_call_handles), ==, 2); + g_assert_cmpint (g_queue_get_length (&dconf_mock_dbus_outstanding_call_handles), ==, 1); } static void @@ -108,7 +108,6 @@ static void test_fast (void) { DConfClient *client; - gint i; g_log_set_writer_func (log_writer_cb, NULL, NULL); @@ -119,30 +118,23 @@ test_fast (void) /* Start indicating that the writes failed. * - * For the first failures, we should continue to see the most recently - * written value (99). - * - * After we fail that last one, we should see NULL returned. + * Because of the pending-merge logic, we should only have had to fail two calls. * * Each time, we should see a change notify. */ - for (i = 0; g_queue_get_length (&dconf_mock_dbus_outstanding_call_handles) > 1; i++) - { - changed_was_called = FALSE; - fail_one_call (); - g_assert (changed_was_called); + g_assert_cmpint (g_queue_get_length (&dconf_mock_dbus_outstanding_call_handles), == , 1); - check_and_free (dconf_client_read (client, "/test/value"), g_variant_new_int32 (99)); - check_and_free (dconf_client_read_full (client, "/test/value", DCONF_READ_DEFAULT_VALUE, NULL), NULL); - } + changed_was_called = FALSE; + fail_one_call (); + g_assert (changed_was_called); - /* Because of the pending-merging logic, we should only have had to - * fail two calls. - */ - g_assert (i == 2); + /* For the first failure, we should continue to see the most recently written value (99) */ + check_and_free (dconf_client_read (client, "/test/value"), g_variant_new_int32 (99)); + check_and_free (dconf_client_read_full (client, "/test/value", DCONF_READ_DEFAULT_VALUE, NULL), NULL); + + g_assert_cmpint (g_queue_get_length (&dconf_mock_dbus_outstanding_call_handles), == , 1); - /* Fail the last call. */ changed_was_called = FALSE; fail_one_call (); g_assert (changed_was_called); @@ -151,11 +143,92 @@ test_fast (void) check_and_free (dconf_client_read (client, "/test/value"), NULL); check_and_free (dconf_client_read_full (client, "/test/value", DCONF_READ_DEFAULT_VALUE, NULL), NULL); + g_assert_cmpint (g_queue_get_length (&dconf_mock_dbus_outstanding_call_handles), == , 0); + /* Cleanup */ g_signal_handlers_disconnect_by_func (client, changed, NULL); g_object_unref (client); } +static gboolean changed_a, changed_b, changed_c; + +static void +coalesce_changed (DConfClient *client, + const gchar *prefix, + const gchar * const *changes, + const gchar *tag, + gpointer user_data) +{ + changed_a = g_str_equal (prefix, "/test/a") || g_strv_contains (changes, "a"); + changed_b = g_str_equal (prefix, "/test/b") || g_strv_contains (changes, "b"); + changed_c = g_str_equal (prefix, "/test/c") || g_strv_contains (changes, "c"); +} + +static void +test_coalesce (void) +{ + gint i, a, b, c; + gboolean should_change_a, should_change_b, should_change_c; + g_autoptr(DConfClient) client = NULL; + + gint changes[][3] = { + {1, 0, 0}, + {1, 1, 1}, + {0, 1, 1}, + {0, 0, 1}, + {0, 0, 0}, + {1, 0, 0}, + {1, 0, 0}, + }; + + client = dconf_client_new (); + g_signal_connect (client, "changed", G_CALLBACK (coalesce_changed), NULL); + + a = b = c = 0; + + for (i = 0; i != G_N_ELEMENTS (changes); ++i) + { + g_autoptr(DConfChangeset) changeset = NULL; + + should_change_a = changes[i][0]; + should_change_b = changes[i][1]; + should_change_c = changes[i][2]; + + changeset = dconf_changeset_new (); + + if (should_change_a) + dconf_changeset_set (changeset, "/test/a", g_variant_new_int32 (++a)); + if (should_change_b) + dconf_changeset_set (changeset, "/test/b", g_variant_new_int32 (++b)); + if (should_change_c) + dconf_changeset_set (changeset, "/test/c", g_variant_new_int32 (++c)); + + changed_a = changed_b = changed_c = FALSE; + + g_assert_true (dconf_client_change_fast (client, changeset, NULL)); + + /* Notifications should be only about keys we have just written. */ + g_assert_cmpint (should_change_a, ==, changed_a); + g_assert_cmpint (should_change_b, ==, changed_b); + g_assert_cmpint (should_change_c, ==, changed_c); + + /* We should see value from the most recent write or NULL if we haven't written it yet. */ + check_and_free (dconf_client_read (client, "/test/a"), a == 0 ? NULL : g_variant_new_int32 (a)); + check_and_free (dconf_client_read (client, "/test/b"), b == 0 ? NULL : g_variant_new_int32 (b)); + check_and_free (dconf_client_read (client, "/test/c"), c == 0 ? NULL : g_variant_new_int32 (c)); + } + + dconf_mock_dbus_async_reply (g_variant_new ("(s)", "1"), NULL); + dconf_mock_dbus_async_reply (g_variant_new ("(s)", "2"), NULL); + + /* There should be no more requests since all but first have been + * coalesced together. */ + dconf_mock_dbus_assert_no_async (); + + /* Cleanup */ + g_signal_handlers_disconnect_by_func (client, changed, NULL); +} + int main (int argc, char **argv) { @@ -167,6 +240,7 @@ main (int argc, char **argv) g_test_add_func ("/client/lifecycle", test_lifecycle); g_test_add_func ("/client/basic-fast", test_fast); + g_test_add_func ("/client/coalesce", test_coalesce); return g_test_run (); } |