summaryrefslogtreecommitdiff
path: root/src/fabric/src/fabric_streams.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/fabric/src/fabric_streams.erl')
-rw-r--r--src/fabric/src/fabric_streams.erl132
1 files changed, 71 insertions, 61 deletions
diff --git a/src/fabric/src/fabric_streams.erl b/src/fabric/src/fabric_streams.erl
index 59c8b8a6b..2a3a2b004 100644
--- a/src/fabric/src/fabric_streams.erl
+++ b/src/fabric/src/fabric_streams.erl
@@ -23,22 +23,17 @@
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-
-define(WORKER_CLEANER, fabric_worker_cleaner).
-
start(Workers, Keypos) ->
start(Workers, Keypos, undefined, undefined).
-
start(Workers, Keypos, RingOpts) ->
start(Workers, Keypos, undefined, undefined, RingOpts).
-
start(Workers, Keypos, StartFun, Replacements) ->
start(Workers, Keypos, StartFun, Replacements, []).
-
start(Workers0, Keypos, StartFun, Replacements, RingOpts) ->
Fun = fun handle_stream_start/3,
Acc = #stream_acc{
@@ -52,16 +47,19 @@ start(Workers0, Keypos, StartFun, Replacements, RingOpts) ->
Timeout = fabric_util:request_timeout(),
case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
{ok, #stream_acc{ready = Workers}} ->
- AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) ->
- rexi:stream_start(From),
- [Worker | WorkerAcc]
- end, [], Workers),
+ AckedWorkers = fabric_dict:fold(
+ fun(Worker, From, WorkerAcc) ->
+ rexi:stream_start(From),
+ [Worker | WorkerAcc]
+ end,
+ [],
+ Workers
+ ),
{ok, AckedWorkers};
Else ->
Else
end.
-
cleanup(Workers) ->
% Stop the auxiliary cleaner process as we got to the point where cleanup
% happesn in the regular fashion so we don't want to send 2x the number kill
@@ -75,7 +73,6 @@ cleanup(Workers) ->
end,
fabric_util:cleanup(Workers).
-
handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) ->
#stream_acc{workers = Workers, ready = Ready, ring_opts = RingOpts} = St,
case fabric_ring:node_down(NodeRef, Workers, Ready, RingOpts) of
@@ -84,7 +81,6 @@ handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) ->
error ->
{error, {nodedown, <<"progress not possible">>}}
end;
-
handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
#stream_acc{
workers = Workers,
@@ -100,11 +96,15 @@ handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
% and start the new workers if so.
case lists:keytake(Worker#shard.range, 1, Replacements) of
{value, {_Range, WorkerReplacements}, NewReplacements} ->
- FinalWorkers = lists:foldl(fun(Repl, NewWorkers) ->
- NewWorker = (St#stream_acc.start_fun)(Repl),
- add_worker_to_cleaner(self(), NewWorker),
- fabric_dict:store(NewWorker, waiting, NewWorkers)
- end, Workers, WorkerReplacements),
+ FinalWorkers = lists:foldl(
+ fun(Repl, NewWorkers) ->
+ NewWorker = (St#stream_acc.start_fun)(Repl),
+ add_worker_to_cleaner(self(), NewWorker),
+ fabric_dict:store(NewWorker, waiting, NewWorkers)
+ end,
+ Workers,
+ WorkerReplacements
+ ),
% Assert that our replaced worker provides us
% the oppurtunity to make progress. Need to make sure
% to include already processed responses, since we are
@@ -115,8 +115,8 @@ handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
true = fabric_ring:is_progress_possible(AllWorkers),
NewRefs = fabric_dict:fetch_keys(FinalWorkers),
{new_refs, NewRefs, St#stream_acc{
- workers=FinalWorkers,
- replacements=NewReplacements
+ workers = FinalWorkers,
+ replacements = NewReplacements
}};
false ->
% If we progress isn't possible and we don't have any
@@ -126,36 +126,32 @@ handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
{error, _} ->
{error, fabric_util:error_info(Reason)}
end;
-
handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) ->
#stream_acc{workers = Workers, ready = Ready, ring_opts = RingOpts} = St,
case fabric_dict:lookup_element(Worker, Workers) of
- undefined ->
- % This worker lost the race with other partition copies, terminate
- rexi:stream_cancel(From),
- {ok, St};
- waiting ->
- case fabric_ring:handle_response(Worker, From, Workers, Ready, RingOpts) of
- {ok, {Workers1, Ready1}} ->
- % Don't have a full ring yet. Keep getting responses
- {ok, St#stream_acc{workers = Workers1, ready = Ready1}};
- {stop, Ready1} ->
- % Have a full ring of workers. But don't ack the worker
- % yet so they don't start sending us rows until we're ready
- {stop, St#stream_acc{workers = [], ready = Ready1}}
- end
+ undefined ->
+ % This worker lost the race with other partition copies, terminate
+ rexi:stream_cancel(From),
+ {ok, St};
+ waiting ->
+ case fabric_ring:handle_response(Worker, From, Workers, Ready, RingOpts) of
+ {ok, {Workers1, Ready1}} ->
+ % Don't have a full ring yet. Keep getting responses
+ {ok, St#stream_acc{workers = Workers1, ready = Ready1}};
+ {stop, Ready1} ->
+ % Have a full ring of workers. But don't ack the worker
+ % yet so they don't start sending us rows until we're ready
+ {stop, St#stream_acc{workers = [], ready = Ready1}}
+ end
end;
-
handle_stream_start({ok, ddoc_updated}, _, St) ->
WaitingWorkers = [W || {W, _} <- St#stream_acc.workers],
ReadyWorkers = [W || {W, _} <- St#stream_acc.ready],
cleanup(WaitingWorkers ++ ReadyWorkers),
{stop, ddoc_updated};
-
handle_stream_start(Else, _, _) ->
exit({invalid_stream_start, Else}).
-
% Spawn an auxiliary rexi worker cleaner. This will be used in cases
% when the coordinator (request) process is forceably killed and doesn't
% get a chance to process its `after` fabric:clean/1 clause.
@@ -168,10 +164,9 @@ spawn_worker_cleaner(Coordinator, Workers) ->
end),
put(?WORKER_CLEANER, Pid),
Pid;
- ExistingCleaner ->
+ ExistingCleaner ->
ExistingCleaner
- end.
-
+ end.
cleaner_loop(Pid, Workers) ->
receive
@@ -181,7 +176,6 @@ cleaner_loop(Pid, Workers) ->
fabric_util:cleanup(Workers)
end.
-
add_worker_to_cleaner(CoordinatorPid, Worker) ->
case get(?WORKER_CLEANER) of
CleanerPid when is_pid(CleanerPid) ->
@@ -190,25 +184,27 @@ add_worker_to_cleaner(CoordinatorPid, Worker) ->
ok
end.
-
-
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
worker_cleaner_test_() ->
{
- "Fabric spawn_worker_cleaner test", {
- setup, fun setup/0, fun teardown/1,
- fun(_) -> [
- should_clean_workers(),
- does_not_fire_if_cleanup_called(),
- should_clean_additional_worker_too()
- ] end
+ "Fabric spawn_worker_cleaner test",
+ {
+ setup,
+ fun setup/0,
+ fun teardown/1,
+ fun(_) ->
+ [
+ should_clean_workers(),
+ does_not_fire_if_cleanup_called(),
+ should_clean_additional_worker_too()
+ ]
+ end
}
}.
-
should_clean_workers() ->
?_test(begin
meck:reset(rexi),
@@ -217,15 +213,20 @@ should_clean_workers() ->
#shard{node = 'n1', ref = make_ref()},
#shard{node = 'n2', ref = make_ref()}
],
- {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end),
+ {Coord, _} = spawn_monitor(fun() ->
+ receive
+ die -> ok
+ end
+ end),
Cleaner = spawn_worker_cleaner(Coord, Workers),
Ref = erlang:monitor(process, Cleaner),
Coord ! die,
- receive {'DOWN', Ref, _, Cleaner, _} -> ok end,
+ receive
+ {'DOWN', Ref, _, Cleaner, _} -> ok
+ end,
?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
end).
-
does_not_fire_if_cleanup_called() ->
?_test(begin
meck:reset(rexi),
@@ -234,18 +235,23 @@ does_not_fire_if_cleanup_called() ->
#shard{node = 'n1', ref = make_ref()},
#shard{node = 'n2', ref = make_ref()}
],
- {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end),
+ {Coord, _} = spawn_monitor(fun() ->
+ receive
+ die -> ok
+ end
+ end),
Cleaner = spawn_worker_cleaner(Coord, Workers),
Ref = erlang:monitor(process, Cleaner),
cleanup(Workers),
Coord ! die,
- receive {'DOWN', Ref, _, _, _} -> ok end,
+ receive
+ {'DOWN', Ref, _, _, _} -> ok
+ end,
% 2 calls would be from cleanup/1 function. If cleanup process fired
% too it would have been 4 calls total.
?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
end).
-
should_clean_additional_worker_too() ->
?_test(begin
meck:reset(rexi),
@@ -253,20 +259,24 @@ should_clean_additional_worker_too() ->
Workers = [
#shard{node = 'n1', ref = make_ref()}
],
- {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end),
+ {Coord, _} = spawn_monitor(fun() ->
+ receive
+ die -> ok
+ end
+ end),
Cleaner = spawn_worker_cleaner(Coord, Workers),
add_worker_to_cleaner(Coord, #shard{node = 'n2', ref = make_ref()}),
Ref = erlang:monitor(process, Cleaner),
Coord ! die,
- receive {'DOWN', Ref, _, Cleaner, _} -> ok end,
+ receive
+ {'DOWN', Ref, _, Cleaner, _} -> ok
+ end,
?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
end).
-
setup() ->
ok = meck:expect(rexi, kill_all, fun(_) -> ok end).
-
teardown(_) ->
meck:unload().