summaryrefslogtreecommitdiff
path: root/src/rexi
diff options
context:
space:
mode:
authorNick Vatamaniuc <vatamane@apache.org>2019-03-18 13:32:14 -0400
committerNick Vatamaniuc <nickva@users.noreply.github.com>2019-04-03 10:48:45 -0400
commitd10b7955929c1581f192fc487840cbf5005f84c9 (patch)
tree7b66fab88eba4c39c1a95919792cb972be7157e7 /src/rexi
parent3227e61ebbe6ee15d122036ec85b57ebffeb062d (diff)
downloadcouchdb-d10b7955929c1581f192fc487840cbf5005f84c9.tar.gz
Uneven shard copy handling in mem3 and fabric
The introduction of shard splitting will eliminate the contraint that all document copies are located in shards with same range boundaries. That assumption was made by default in mem3 and fabric functions that do shard replacement, worker spawning, unpacking `_changes` update sequences and some others. This commit updates those places to handle the case where document copies might be in different shard ranges. A good place to start from is the `mem3_util:get_ring()` function. This function returns a full non-overlapped ring from a set of possibly overlapping shards. This function is used by almost everything else in this commit: 1) It's used when only a single copy of the data is needed, for example in cases where _all_docs or _changes procesessig. 2) Used when checking if progress is possible after some nodes died. `get_ring()` returns `[]` when it cannot find a full ring is used to indicate that progress is not possible. 3) During shard replacement. This is pershaps the most complicated case. During replacement besides just finding a possible covering of the ring from the set of shards, it is also desirable to find one that minimizes the number of workers that have to be replaced. A neat trick used here is to provide `get_ring` with a custom sort function, which prioritizes certain shard copies over others. In case of replacements it prioritiezes shards for which workers have already spawned. In the default cause `get_ring()` will prioritize longer ranges over shorter ones, so for example, to cover the interval [00-ff] with either [00-7f, 80-ff] or [00-ff] shards ranges, it will pick the single [00-ff] range instead of [00-7f, 80-ff] pair. Co-authored-by: Paul J. Davis <davisp@apache.org>
Diffstat (limited to 'src/rexi')
-rw-r--r--src/rexi/src/rexi.erl14
-rw-r--r--src/rexi/src/rexi_server.erl28
2 files changed, 32 insertions, 10 deletions
diff --git a/src/rexi/src/rexi.erl b/src/rexi/src/rexi.erl
index f774dc9d4..21e2b5388 100644
--- a/src/rexi/src/rexi.erl
+++ b/src/rexi/src/rexi.erl
@@ -12,7 +12,7 @@
-module(rexi).
-export([start/0, stop/0, restart/0]).
--export([cast/2, cast/3, cast/4, kill/2]).
+-export([cast/2, cast/3, cast/4, kill/2, kill_all/1]).
-export([reply/1, sync_reply/1, sync_reply/2]).
-export([async_server_call/2, async_server_call/3]).
-export([stream_init/0, stream_init/1]).
@@ -76,6 +76,18 @@ kill(Node, Ref) ->
rexi_utils:send(rexi_utils:server_pid(Node), cast_msg({kill, Ref})),
ok.
+%% @doc Sends an async kill signal to the remote processes associated with Refs.
+%% No rexi_EXIT message will be sent.
+-spec kill_all([{node(), reference()}]) -> ok.
+kill_all(NodeRefs) when is_list(NodeRefs) ->
+ PerNodeMap = lists:foldl(fun({Node, Ref}, Acc) ->
+ maps:update_with(Node, fun(Refs) -> [Ref | Refs] end, [Ref], Acc)
+ end, #{}, NodeRefs),
+ maps:map(fun(Node, Refs) ->
+ rexi_utils:send(rexi_utils:server_pid(Node), cast_msg({kill_all, Refs}))
+ end, PerNodeMap),
+ ok.
+
%% @equiv async_server_call(Server, self(), Request)
-spec async_server_call(pid() | {atom(),node()}, any()) -> reference().
async_server_call(Server, Request) ->
diff --git a/src/rexi/src/rexi_server.erl b/src/rexi/src/rexi_server.erl
index 58a510b68..fedff69c3 100644
--- a/src/rexi/src/rexi_server.erl
+++ b/src/rexi/src/rexi_server.erl
@@ -79,15 +79,13 @@ handle_cast({doit, {ClientPid, ClientRef} = From, Nonce, MFA}, State) ->
{noreply, add_job(Job, State)};
-handle_cast({kill, FromRef}, #st{clients = Clients} = St) ->
- case find_worker(FromRef, Clients) of
- #job{worker = KeyRef, worker_pid = Pid} = Job ->
- erlang:demonitor(KeyRef),
- exit(Pid, kill),
- {noreply, remove_job(Job, St)};
- false ->
- {noreply, St}
- end;
+handle_cast({kill, FromRef}, St) ->
+ kill_worker(FromRef, St),
+ {noreply, St};
+
+handle_cast({kill_all, FromRefs}, St) ->
+ lists:foreach(fun(FromRef) -> kill_worker(FromRef, St) end, FromRefs),
+ {noreply, St};
handle_cast(_, St) ->
couch_log:notice("rexi_server ignored_cast", []),
@@ -181,3 +179,15 @@ find_worker(Ref, Tab) ->
notify_caller({Caller, Ref}, Reason) ->
rexi_utils:send(Caller, {Ref, {rexi_EXIT, Reason}}).
+
+
+kill_worker(FromRef, #st{clients = Clients} = St) ->
+ case find_worker(FromRef, Clients) of
+ #job{worker = KeyRef, worker_pid = Pid} = Job ->
+ erlang:demonitor(KeyRef),
+ exit(Pid, kill),
+ remove_job(Job, St),
+ ok;
+ false ->
+ ok
+ end.