diff options
author | Nick Vatamaniuc <vatamane@apache.org> | 2019-03-18 13:32:14 -0400 |
---|---|---|
committer | Nick Vatamaniuc <nickva@users.noreply.github.com> | 2019-04-03 10:48:45 -0400 |
commit | d10b7955929c1581f192fc487840cbf5005f84c9 (patch) | |
tree | 7b66fab88eba4c39c1a95919792cb972be7157e7 /src/rexi | |
parent | 3227e61ebbe6ee15d122036ec85b57ebffeb062d (diff) | |
download | couchdb-d10b7955929c1581f192fc487840cbf5005f84c9.tar.gz |
Uneven shard copy handling in mem3 and fabric
The introduction of shard splitting will eliminate the contraint that all
document copies are located in shards with same range boundaries. That
assumption was made by default in mem3 and fabric functions that do shard
replacement, worker spawning, unpacking `_changes` update sequences and some
others. This commit updates those places to handle the case where document
copies might be in different shard ranges.
A good place to start from is the `mem3_util:get_ring()` function. This
function returns a full non-overlapped ring from a set of possibly overlapping
shards.
This function is used by almost everything else in this commit:
1) It's used when only a single copy of the data is needed, for example in
cases where _all_docs or _changes procesessig.
2) Used when checking if progress is possible after some nodes died.
`get_ring()` returns `[]` when it cannot find a full ring is used to indicate
that progress is not possible.
3) During shard replacement. This is pershaps the most complicated case. During
replacement besides just finding a possible covering of the ring from the set
of shards, it is also desirable to find one that minimizes the number of
workers that have to be replaced. A neat trick used here is to provide
`get_ring` with a custom sort function, which prioritizes certain shard copies
over others. In case of replacements it prioritiezes shards for which workers
have already spawned. In the default cause `get_ring()` will prioritize longer
ranges over shorter ones, so for example, to cover the interval [00-ff] with
either [00-7f, 80-ff] or [00-ff] shards ranges, it will pick the single [00-ff]
range instead of [00-7f, 80-ff] pair.
Co-authored-by: Paul J. Davis <davisp@apache.org>
Diffstat (limited to 'src/rexi')
-rw-r--r-- | src/rexi/src/rexi.erl | 14 | ||||
-rw-r--r-- | src/rexi/src/rexi_server.erl | 28 |
2 files changed, 32 insertions, 10 deletions
diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl index f774dc9d4..21e2b5388 100644 --- a/src/rexi/src/rexi.erl +++ b/src/rexi/src/rexi.erl @@ -12,7 +12,7 @@ -module(rexi). -export([start/0, stop/0, restart/0]). --export([cast/2, cast/3, cast/4, kill/2]). +-export([cast/2, cast/3, cast/4, kill/2, kill_all/1]). -export([reply/1, sync_reply/1, sync_reply/2]). -export([async_server_call/2, async_server_call/3]). -export([stream_init/0, stream_init/1]). @@ -76,6 +76,18 @@ kill(Node, Ref) -> rexi_utils:send(rexi_utils:server_pid(Node), cast_msg({kill, Ref})), ok. +%% @doc Sends an async kill signal to the remote processes associated with Refs. +%% No rexi_EXIT message will be sent. +-spec kill_all([{node(), reference()}]) -> ok. +kill_all(NodeRefs) when is_list(NodeRefs) -> + PerNodeMap = lists:foldl(fun({Node, Ref}, Acc) -> + maps:update_with(Node, fun(Refs) -> [Ref | Refs] end, [Ref], Acc) + end, #{}, NodeRefs), + maps:map(fun(Node, Refs) -> + rexi_utils:send(rexi_utils:server_pid(Node), cast_msg({kill_all, Refs})) + end, PerNodeMap), + ok. + %% @equiv async_server_call(Server, self(), Request) -spec async_server_call(pid() | {atom(),node()}, any()) -> reference(). async_server_call(Server, Request) -> diff --git a/src/rexi/src/rexi_server.erl b/src/rexi/src/rexi_server.erl index 58a510b68..fedff69c3 100644 --- a/src/rexi/src/rexi_server.erl +++ b/src/rexi/src/rexi_server.erl @@ -79,15 +79,13 @@ handle_cast({doit, {ClientPid, ClientRef} = From, Nonce, MFA}, State) -> {noreply, add_job(Job, State)}; -handle_cast({kill, FromRef}, #st{clients = Clients} = St) -> - case find_worker(FromRef, Clients) of - #job{worker = KeyRef, worker_pid = Pid} = Job -> - erlang:demonitor(KeyRef), - exit(Pid, kill), - {noreply, remove_job(Job, St)}; - false -> - {noreply, St} - end; +handle_cast({kill, FromRef}, St) -> + kill_worker(FromRef, St), + {noreply, St}; + +handle_cast({kill_all, FromRefs}, St) -> + lists:foreach(fun(FromRef) -> kill_worker(FromRef, St) end, FromRefs), + {noreply, St}; handle_cast(_, St) -> couch_log:notice("rexi_server ignored_cast", []), @@ -181,3 +179,15 @@ find_worker(Ref, Tab) -> notify_caller({Caller, Ref}, Reason) -> rexi_utils:send(Caller, {Ref, {rexi_EXIT, Reason}}). + + +kill_worker(FromRef, #st{clients = Clients} = St) -> + case find_worker(FromRef, Clients) of + #job{worker = KeyRef, worker_pid = Pid} = Job -> + erlang:demonitor(KeyRef), + exit(Pid, kill), + remove_job(Job, St), + ok; + false -> + ok + end. |