Fix replicator cluster stability race condition
Replicator clustering module is in charge of keeping track of when the cluster is stable or unstable. A cluster is said to be "stable" if there aren't any new nodes added or removed for some period of time. Replicator document processor uses cluster state to determine what to do with document updates as they come in. If a document update comes during a time when cluster is unstable, the update is skipped. The idea is that when cluster becomes stable again, all the documents will be rescanned anyway. In order for document updates to not be dropped, there is an implicit constraint when cluster becomes stable -- after(couch_replicator_notifier gen_event broadcasts `{cluster, stable}` event, any subsequent calls to `couch_replicator_clustering:is_stable()` would return `true`. If that's not the case then this sequence of events is possible: 1. `mem3_cluster` process calls the `cluster_stable` callback 2. `couch_replicator_notifier` broadcasts `{cluster, stable}` event 3. `couch_replicator_doc_processor` starts processing documents 4. On first document update `couch_replicator_clustering:is_stable()` is `false`, because that gen_server wasn't notifier yet. 5. Document update is dropped. 6. There won't be any rescans until cluster membership is changed again. To fix this, switch to setting stable state first via a `gen_server` call. This way after the `{cluster, stable}` has been called, `is_stable()` is guaranteed to return `true`. Note: This issue is mostly theoretical. It was noticed when examining the code related to another bug. The chance of the clustering process going to sleep immediately after gen_event broadcast, then not handling the cast long enough for a document to be processed by doc processor is pretty low in practice.
@@ -114,17 +114,17 @@ link_cluster_event_listener(Mod, Fun, Args)
% Mem3 cluster callbacks
cluster_unstable(Server) ->
+ ok = gen_server:call(Server, set_unstable),
couch_replicator_notifier:notify({cluster, unstable}),
couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0),
couch_log:notice("~s : cluster unstable", [?MODULE]),
- gen_server:cast(Server, cluster_unstable),
cluster_stable(Server) ->
+ ok = gen_server:call(Server, set_stable),
couch_replicator_notifier:notify({cluster, stable}),
couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1),
couch_log:notice("~s : cluster stable", [?MODULE]),
- gen_server:cast(Server, cluster_stable),
@@ -147,18 +147,18 @@ terminate(_Reason, _State) ->
handle_call(is_stable, _From, #state{cluster_stable = IsStable} = State) ->
- {reply, IsStable, State}.
+ {reply, IsStable, State};
+handle_call(set_stable, _From, State) ->
+ {reply, ok, State#state{cluster_stable = true}};
-handle_cast({set_period, Period}, #state{mem3_cluster_pid = Pid} = State) ->
- ok = mem3_cluster:set_period(Pid, Period),
- {noreply, State};
+handle_call(set_unstable, _From, State) ->
+ {reply, ok, State#state{cluster_stable = false}}.
-handle_cast(cluster_stable, State) ->
- {noreply, State#state{cluster_stable = true}};
-handle_cast(cluster_unstable, State) ->
- {noreply, State#state{cluster_stable = false}}.
+handle_cast({set_period, Period}, #state{mem3_cluster_pid = Pid} = State) ->
+ ok = mem3_cluster:set_period(Pid, Period),
+ {noreply, State}.
handle_info(restart_config_listener, State) ->
@@ -193,3 +193,56 @@ owner_int(ShardName, DocId) ->
Shards = mem3:shards(DbName, DocId),
Nodes = [N || #shard{node=N} <- Shards, lists:member(N, Live)],
mem3:owner(DbName, DocId, Nodes).
+replicator_clustering_test_() ->
+ {
+ foreach,
+ fun setup/0,
+ fun teardown/1,
+ [
+ t_stable_callback(),
+ t_unstable_callback()
+ ]
+ }.
+t_stable_callback() ->
+ ?_test(begin
+ ?assertEqual(false, is_stable()),
+ cluster_stable(whereis(?MODULE)),
+ ?assertEqual(true, is_stable())
+ end).
+t_unstable_callback() ->
+ ?_test(begin
+ cluster_stable(whereis(?MODULE)),
+ ?assertEqual(true, is_stable()),
+ cluster_unstable(whereis(?MODULE)),
+ ?assertEqual(false, is_stable())
+ end).
+setup() ->
+ meck:expect(couch_log, notice, 2, ok),
+ meck:expect(config, get, fun(_, _, Default) -> Default end),
+ meck:expect(config, listen_for_changes, 2, ok),
+ meck:expect(couch_stats, update_gauge, 2, ok),
+ meck:expect(couch_replicator_notifier, notify, 1, ok),
+ {ok, Pid} = start_link(),
+ Pid.
+teardown(Pid) ->
+ unlink(Pid),
+ exit(Pid, kill),
+ meck:unload().