summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2017-11-20 12:29:43 -0500
committerNick Vatamaniuc <nickva@users.noreply.github.com>2017-11-22 09:51:07 -0500
commitbdaeaff948b90d30686104349b8f01c19f0c482b (patch)
tree10191d3f682a354806da2bb30fb6ec89ac9b384f
parent7c789228bebfab7ea07f4181541fa57e1e744208 (diff)
downloadcouchdb-bdaeaff948b90d30686104349b8f01c19f0c482b.tar.gz
Fix replicator cluster stability race condition
Replicator clustering module is in charge of keeping track of when the cluster is stable or unstable. A cluster is said to be "stable" if there aren't any new nodes added or removed for some period of time. Replicator document processor uses cluster state to determine what to do with document updates as they come in. If a document update comes during a time when cluster is unstable, the update is skipped. The idea is that when cluster becomes stable again, all the documents will be rescanned anyway. In order for document updates to not be dropped, there is an implicit constraint when cluster becomes stable -- after(couch_replicator_notifier gen_event broadcasts `{cluster, stable}` event, any subsequent calls to `couch_replicator_clustering:is_stable()` would return `true`. If that's not the case then this sequence of events is possible: 1. `mem3_cluster` process calls the `cluster_stable` callback 2. `couch_replicator_notifier` broadcasts `{cluster, stable}` event 3. `couch_replicator_doc_processor` starts processing documents 4. On first document update `couch_replicator_clustering:is_stable()` is `false`, because that gen_server wasn't notifier yet. 5. Document update is dropped. 6. There won't be any rescans until cluster membership is changed again. To fix this, switch to setting stable state first via a `gen_server` call. This way after the `{cluster, stable}` has been called, `is_stable()` is guaranteed to return `true`. Note: This issue is mostly theoretical. It was noticed when examining the code related to another bug. The chance of the clustering process going to sleep immediately after gen_event broadcast, then not handling the cast long enough for a document to be processed by doc processor is pretty low in practice.
-rw-r--r--src/couch_replicator/src/couch_replicator_clustering.erl73
1 files changed, 63 insertions, 10 deletions
diff --git a/src/couch_replicator/src/couch_replicator_clustering.erl b/src/couch_replicator/src/couch_replicator_clustering.erl
index 3d5229b9f..a7f7573b6 100644
--- a/src/couch_replicator/src/couch_replicator_clustering.erl
+++ b/src/couch_replicator/src/couch_replicator_clustering.erl
@@ -114,17 +114,17 @@ link_cluster_event_listener(Mod, Fun, Args)
% Mem3 cluster callbacks
cluster_unstable(Server) ->
+ ok = gen_server:call(Server, set_unstable),
couch_replicator_notifier:notify({cluster, unstable}),
couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
couch_log:notice("~s : cluster unstable", [?MODULE]),
- gen_server:cast(Server, cluster_unstable),
Server.
cluster_stable(Server) ->
+ ok = gen_server:call(Server, set_stable),
couch_replicator_notifier:notify({cluster, stable}),
couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1),
couch_log:notice("~s : cluster stable", [?MODULE]),
- gen_server:cast(Server, cluster_stable),
Server.
@@ -147,18 +147,18 @@ terminate(_Reason, _State) ->
handle_call(is_stable, _From, #state{cluster_stable = IsStable} = State) ->
- {reply, IsStable, State}.
+ {reply, IsStable, State};
+handle_call(set_stable, _From, State) ->
+ {reply, ok, State#state{cluster_stable = true}};
-handle_cast({set_period, Period}, #state{mem3_cluster_pid = Pid} = State) ->
- ok = mem3_cluster:set_period(Pid, Period),
- {noreply, State};
+handle_call(set_unstable, _From, State) ->
+ {reply, ok, State#state{cluster_stable = false}}.
-handle_cast(cluster_stable, State) ->
- {noreply, State#state{cluster_stable = true}};
-handle_cast(cluster_unstable, State) ->
- {noreply, State#state{cluster_stable = false}}.
+handle_cast({set_period, Period}, #state{mem3_cluster_pid = Pid} = State) ->
+ ok = mem3_cluster:set_period(Pid, Period),
+ {noreply, State}.
handle_info(restart_config_listener, State) ->
@@ -193,3 +193,56 @@ owner_int(ShardName, DocId) ->
Shards = mem3:shards(DbName, DocId),
Nodes = [N || #shard{node=N} <- Shards, lists:member(N, Live)],
mem3:owner(DbName, DocId, Nodes).
+
+
+
+-ifdef(TEST).
+
+-include_lib("eunit/include/eunit.hrl").
+
+
+replicator_clustering_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ t_stable_callback(),
+ t_unstable_callback()
+ ]
+ }.
+
+
+t_stable_callback() ->
+ ?_test(begin
+ ?assertEqual(false, is_stable()),
+ cluster_stable(whereis(?MODULE)),
+ ?assertEqual(true, is_stable())
+ end).
+
+
+t_unstable_callback() ->
+ ?_test(begin
+ cluster_stable(whereis(?MODULE)),
+ ?assertEqual(true, is_stable()),
+ cluster_unstable(whereis(?MODULE)),
+ ?assertEqual(false, is_stable())
+ end).
+
+
+setup() ->
+ meck:expect(couch_log, notice, 2, ok),
+ meck:expect(config, get, fun(_, _, Default) -> Default end),
+ meck:expect(config, listen_for_changes, 2, ok),
+ meck:expect(couch_stats, update_gauge, 2, ok),
+ meck:expect(couch_replicator_notifier, notify, 1, ok),
+ {ok, Pid} = start_link(),
+ Pid.
+
+
+teardown(Pid) ->
+ unlink(Pid),
+ exit(Pid, kill),
+ meck:unload().
+
+-endif.