summaryrefslogtreecommitdiff
path: root/src/fabric/src/fabric_streams.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/fabric/src/fabric_streams.erl')
-rw-r--r--src/fabric/src/fabric_streams.erl273
1 files changed, 0 insertions, 273 deletions
diff --git a/src/fabric/src/fabric_streams.erl b/src/fabric/src/fabric_streams.erl
deleted file mode 100644
index 59c8b8a6b..000000000
--- a/src/fabric/src/fabric_streams.erl
+++ /dev/null
@@ -1,273 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(fabric_streams).
-
--export([
- start/2,
- start/3,
- start/4,
- start/5,
- cleanup/1
-]).
-
--include_lib("fabric/include/fabric.hrl").
--include_lib("mem3/include/mem3.hrl").
-
-
--define(WORKER_CLEANER, fabric_worker_cleaner).
-
-
-start(Workers, Keypos) ->
- start(Workers, Keypos, undefined, undefined).
-
-
-start(Workers, Keypos, RingOpts) ->
- start(Workers, Keypos, undefined, undefined, RingOpts).
-
-
-start(Workers, Keypos, StartFun, Replacements) ->
- start(Workers, Keypos, StartFun, Replacements, []).
-
-
-start(Workers0, Keypos, StartFun, Replacements, RingOpts) ->
- Fun = fun handle_stream_start/3,
- Acc = #stream_acc{
- workers = fabric_dict:init(Workers0, waiting),
- ready = [],
- start_fun = StartFun,
- replacements = Replacements,
- ring_opts = RingOpts
- },
- spawn_worker_cleaner(self(), Workers0),
- Timeout = fabric_util:request_timeout(),
- case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
- {ok, #stream_acc{ready = Workers}} ->
- AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) ->
- rexi:stream_start(From),
- [Worker | WorkerAcc]
- end, [], Workers),
- {ok, AckedWorkers};
- Else ->
- Else
- end.
-
-
-cleanup(Workers) ->
- % Stop the auxiliary cleaner process as we got to the point where cleanup
- % happesn in the regular fashion so we don't want to send 2x the number kill
- % messages
- case get(?WORKER_CLEANER) of
- CleanerPid when is_pid(CleanerPid) ->
- erase(?WORKER_CLEANER),
- exit(CleanerPid, kill);
- _ ->
- ok
- end,
- fabric_util:cleanup(Workers).
-
-
-handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) ->
- #stream_acc{workers = Workers, ready = Ready, ring_opts = RingOpts} = St,
- case fabric_ring:node_down(NodeRef, Workers, Ready, RingOpts) of
- {ok, Workers1} ->
- {ok, St#stream_acc{workers = Workers1}};
- error ->
- {error, {nodedown, <<"progress not possible">>}}
- end;
-
-handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
- #stream_acc{
- workers = Workers,
- ready = Ready,
- replacements = Replacements,
- ring_opts = RingOpts
- } = St,
- case {fabric_ring:handle_error(Worker, Workers, Ready, RingOpts), Reason} of
- {{ok, Workers1}, _Reason} ->
- {ok, St#stream_acc{workers = Workers1}};
- {error, {maintenance_mode, _Node}} when Replacements /= undefined ->
- % Check if we have replacements for this range
- % and start the new workers if so.
- case lists:keytake(Worker#shard.range, 1, Replacements) of
- {value, {_Range, WorkerReplacements}, NewReplacements} ->
- FinalWorkers = lists:foldl(fun(Repl, NewWorkers) ->
- NewWorker = (St#stream_acc.start_fun)(Repl),
- add_worker_to_cleaner(self(), NewWorker),
- fabric_dict:store(NewWorker, waiting, NewWorkers)
- end, Workers, WorkerReplacements),
- % Assert that our replaced worker provides us
- % the oppurtunity to make progress. Need to make sure
- % to include already processed responses, since we are
- % checking the full range and some workers have already
- % responded and were removed from the workers list
- ReadyWorkers = [{W, R} || {_, W, R} <- Ready],
- AllWorkers = FinalWorkers ++ ReadyWorkers,
- true = fabric_ring:is_progress_possible(AllWorkers),
- NewRefs = fabric_dict:fetch_keys(FinalWorkers),
- {new_refs, NewRefs, St#stream_acc{
- workers=FinalWorkers,
- replacements=NewReplacements
- }};
- false ->
- % If we progress isn't possible and we don't have any
- % replacements then we're dead in the water.
- {error, {nodedown, <<"progress not possible">>}}
- end;
- {error, _} ->
- {error, fabric_util:error_info(Reason)}
- end;
-
-handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) ->
- #stream_acc{workers = Workers, ready = Ready, ring_opts = RingOpts} = St,
- case fabric_dict:lookup_element(Worker, Workers) of
- undefined ->
- % This worker lost the race with other partition copies, terminate
- rexi:stream_cancel(From),
- {ok, St};
- waiting ->
- case fabric_ring:handle_response(Worker, From, Workers, Ready, RingOpts) of
- {ok, {Workers1, Ready1}} ->
- % Don't have a full ring yet. Keep getting responses
- {ok, St#stream_acc{workers = Workers1, ready = Ready1}};
- {stop, Ready1} ->
- % Have a full ring of workers. But don't ack the worker
- % yet so they don't start sending us rows until we're ready
- {stop, St#stream_acc{workers = [], ready = Ready1}}
- end
- end;
-
-handle_stream_start({ok, ddoc_updated}, _, St) ->
- WaitingWorkers = [W || {W, _} <- St#stream_acc.workers],
- ReadyWorkers = [W || {W, _} <- St#stream_acc.ready],
- cleanup(WaitingWorkers ++ ReadyWorkers),
- {stop, ddoc_updated};
-
-handle_stream_start(Else, _, _) ->
- exit({invalid_stream_start, Else}).
-
-
-% Spawn an auxiliary rexi worker cleaner. This will be used in cases
-% when the coordinator (request) process is forceably killed and doesn't
-% get a chance to process its `after` fabric:clean/1 clause.
-spawn_worker_cleaner(Coordinator, Workers) ->
- case get(?WORKER_CLEANER) of
- undefined ->
- Pid = spawn(fun() ->
- erlang:monitor(process, Coordinator),
- cleaner_loop(Coordinator, Workers)
- end),
- put(?WORKER_CLEANER, Pid),
- Pid;
- ExistingCleaner ->
- ExistingCleaner
- end.
-
-
-cleaner_loop(Pid, Workers) ->
- receive
- {add_worker, Pid, Worker} ->
- cleaner_loop(Pid, [Worker | Workers]);
- {'DOWN', _, _, Pid, _} ->
- fabric_util:cleanup(Workers)
- end.
-
-
-add_worker_to_cleaner(CoordinatorPid, Worker) ->
- case get(?WORKER_CLEANER) of
- CleanerPid when is_pid(CleanerPid) ->
- CleanerPid ! {add_worker, CoordinatorPid, Worker};
- _ ->
- ok
- end.
-
-
-
--ifdef(TEST).
-
--include_lib("eunit/include/eunit.hrl").
-
-worker_cleaner_test_() ->
- {
- "Fabric spawn_worker_cleaner test", {
- setup, fun setup/0, fun teardown/1,
- fun(_) -> [
- should_clean_workers(),
- does_not_fire_if_cleanup_called(),
- should_clean_additional_worker_too()
- ] end
- }
- }.
-
-
-should_clean_workers() ->
- ?_test(begin
- meck:reset(rexi),
- erase(?WORKER_CLEANER),
- Workers = [
- #shard{node = 'n1', ref = make_ref()},
- #shard{node = 'n2', ref = make_ref()}
- ],
- {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end),
- Cleaner = spawn_worker_cleaner(Coord, Workers),
- Ref = erlang:monitor(process, Cleaner),
- Coord ! die,
- receive {'DOWN', Ref, _, Cleaner, _} -> ok end,
- ?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
- end).
-
-
-does_not_fire_if_cleanup_called() ->
- ?_test(begin
- meck:reset(rexi),
- erase(?WORKER_CLEANER),
- Workers = [
- #shard{node = 'n1', ref = make_ref()},
- #shard{node = 'n2', ref = make_ref()}
- ],
- {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end),
- Cleaner = spawn_worker_cleaner(Coord, Workers),
- Ref = erlang:monitor(process, Cleaner),
- cleanup(Workers),
- Coord ! die,
- receive {'DOWN', Ref, _, _, _} -> ok end,
- % 2 calls would be from cleanup/1 function. If cleanup process fired
- % too it would have been 4 calls total.
- ?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
- end).
-
-
-should_clean_additional_worker_too() ->
- ?_test(begin
- meck:reset(rexi),
- erase(?WORKER_CLEANER),
- Workers = [
- #shard{node = 'n1', ref = make_ref()}
- ],
- {Coord, _} = spawn_monitor(fun() -> receive die -> ok end end),
- Cleaner = spawn_worker_cleaner(Coord, Workers),
- add_worker_to_cleaner(Coord, #shard{node = 'n2', ref = make_ref()}),
- Ref = erlang:monitor(process, Cleaner),
- Coord ! die,
- receive {'DOWN', Ref, _, Cleaner, _} -> ok end,
- ?assertEqual(1, meck:num_calls(rexi, kill_all, 1))
- end).
-
-
-setup() ->
- ok = meck:expect(rexi, kill_all, fun(_) -> ok end).
-
-
-teardown(_) ->
- meck:unload().
-
--endif.