diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2017-11-20 12:29:43 -0500 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2017-11-22 09:51:07 -0500 |
commit | bdaeaff948b90d30686104349b8f01c19f0c482b (patch) | |
tree | 10191d3f682a354806da2bb30fb6ec89ac9b384f | |
parent | 7c789228bebfab7ea07f4181541fa57e1e744208 (diff) | |
download | couchdb-bdaeaff948b90d30686104349b8f01c19f0c482b.tar.gz |
Fix replicator cluster stability race condition
Replicator clustering module is in charge of keeping track of when the cluster
is stable or unstable. A cluster is said to be "stable" if there aren't any
new nodes added or removed for some period of time.
Replicator document processor uses cluster state to determine what to do with
document updates as they come in. If a document update comes during a time when
cluster is unstable, the update is skipped. The idea is that when cluster
becomes stable again, all the documents will be rescanned anyway.
In order for document updates to not be dropped, there is an implicit
constraint when cluster becomes stable -- after(couch_replicator_notifier
gen_event broadcasts `{cluster, stable}` event, any subsequent calls to
`couch_replicator_clustering:is_stable()` would return `true`. If that's not
the case then this sequence of events is possible:
1. `mem3_cluster` process calls the `cluster_stable` callback
2. `couch_replicator_notifier` broadcasts `{cluster, stable}` event
3. `couch_replicator_doc_processor` starts processing documents
4. On first document update `couch_replicator_clustering:is_stable()` is
`false`, because that gen_server wasn't notifier yet.
5. Document update is dropped.
6. There won't be any rescans until cluster membership is changed again.
To fix this, switch to setting stable state first via a `gen_server` call. This
way after the `{cluster, stable}` has been called, `is_stable()` is guaranteed to
return `true`.
Note: This issue is mostly theoretical. It was noticed when examining the code
related to another bug. The chance of the clustering process going to sleep
immediately after gen_event broadcast, then not handling the cast long enough
for a document to be processed by doc processor is pretty low in practice.
-rw-r--r-- | src/couch_replicator/src/couch_replicator_clustering.erl | 73 |
1 files changed, 63 insertions, 10 deletions
diff --git a/src/couch_replicator/src/couch_replicator_clustering.erl b/src/couch_replicator/src/couch_replicator_clustering.erl index 3d5229b9f..a7f7573b6 100644 --- a/src/couch_replicator/src/couch_replicator_clustering.erl +++ b/src/couch_replicator/src/couch_replicator_clustering.erl @@ -114,17 +114,17 @@ link_cluster_event_listener(Mod, Fun, Args) % Mem3 cluster callbacks cluster_unstable(Server) -> + ok = gen_server:call(Server, set_unstable), couch_replicator_notifier:notify({cluster, unstable}), couch_stats:update_gauge([couch_replicator, cluster_is_stable], 0), couch_log:notice("~s : cluster unstable", [?MODULE]), - gen_server:cast(Server, cluster_unstable), Server. cluster_stable(Server) -> + ok = gen_server:call(Server, set_stable), couch_replicator_notifier:notify({cluster, stable}), couch_stats:update_gauge([couch_replicator, cluster_is_stable], 1), couch_log:notice("~s : cluster stable", [?MODULE]), - gen_server:cast(Server, cluster_stable), Server. @@ -147,18 +147,18 @@ terminate(_Reason, _State) -> handle_call(is_stable, _From, #state{cluster_stable = IsStable} = State) -> - {reply, IsStable, State}. + {reply, IsStable, State}; +handle_call(set_stable, _From, State) -> + {reply, ok, State#state{cluster_stable = true}}; -handle_cast({set_period, Period}, #state{mem3_cluster_pid = Pid} = State) -> - ok = mem3_cluster:set_period(Pid, Period), - {noreply, State}; +handle_call(set_unstable, _From, State) -> + {reply, ok, State#state{cluster_stable = false}}. -handle_cast(cluster_stable, State) -> - {noreply, State#state{cluster_stable = true}}; -handle_cast(cluster_unstable, State) -> - {noreply, State#state{cluster_stable = false}}. +handle_cast({set_period, Period}, #state{mem3_cluster_pid = Pid} = State) -> + ok = mem3_cluster:set_period(Pid, Period), + {noreply, State}. handle_info(restart_config_listener, State) -> @@ -193,3 +193,56 @@ owner_int(ShardName, DocId) -> Shards = mem3:shards(DbName, DocId), Nodes = [N || #shard{node=N} <- Shards, lists:member(N, Live)], mem3:owner(DbName, DocId, Nodes). + + + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + + +replicator_clustering_test_() -> + { + foreach, + fun setup/0, + fun teardown/1, + [ + t_stable_callback(), + t_unstable_callback() + ] + }. + + +t_stable_callback() -> + ?_test(begin + ?assertEqual(false, is_stable()), + cluster_stable(whereis(?MODULE)), + ?assertEqual(true, is_stable()) + end). + + +t_unstable_callback() -> + ?_test(begin + cluster_stable(whereis(?MODULE)), + ?assertEqual(true, is_stable()), + cluster_unstable(whereis(?MODULE)), + ?assertEqual(false, is_stable()) + end). + + +setup() -> + meck:expect(couch_log, notice, 2, ok), + meck:expect(config, get, fun(_, _, Default) -> Default end), + meck:expect(config, listen_for_changes, 2, ok), + meck:expect(couch_stats, update_gauge, 2, ok), + meck:expect(couch_replicator_notifier, notify, 1, ok), + {ok, Pid} = start_link(), + Pid. + + +teardown(Pid) -> + unlink(Pid), + exit(Pid, kill), + meck:unload(). + +-endif. |