diff options
Diffstat (limited to 'src/fabric/src/fabric_db_update_listener.erl')
-rw-r--r-- | src/fabric/src/fabric_db_update_listener.erl | 68 |
1 files changed, 37 insertions, 31 deletions
diff --git a/src/fabric/src/fabric_db_update_listener.erl b/src/fabric/src/fabric_db_update_listener.erl index fb2937be1..78ccf5a4d 100644 --- a/src/fabric/src/fabric_db_update_listener.erl +++ b/src/fabric/src/fabric_db_update_listener.erl @@ -45,18 +45,19 @@ go(Parent, ParentRef, DbName, Timeout) -> %% This is not a common pattern for rexi but to enable the calling %% process to communicate via handle_message/3 we "fake" it as a %% a spawned worker. - Workers = [#worker{ref=ParentRef, pid=Parent} | Notifiers], + Workers = [#worker{ref = ParentRef, pid = Parent} | Notifiers], Acc = #acc{ parent = Parent, state = unset, shards = Shards }, - Resp = try - receive_results(Workers, Acc, Timeout) - after - rexi_monitor:stop(RexiMon), - stop_cleanup_monitor(MonPid) - end, + Resp = + try + receive_results(Workers, Acc, Timeout) + after + rexi_monitor:stop(RexiMon), + stop_cleanup_monitor(MonPid) + end, case Resp of {ok, _} -> ok; {error, Error} -> erlang:error(Error); @@ -64,13 +65,20 @@ go(Parent, ParentRef, DbName, Timeout) -> end. start_update_notifiers(Shards) -> - EndPointDict = lists:foldl(fun(#shard{node=Node, name=Name}, Acc) -> - dict:append(Node, Name, Acc) - end, dict:new(), Shards), - lists:map(fun({Node, DbNames}) -> - Ref = rexi:cast(Node, {?MODULE, start_update_notifier, [DbNames]}), - #worker{ref=Ref, node=Node} - end, dict:to_list(EndPointDict)). + EndPointDict = lists:foldl( + fun(#shard{node = Node, name = Name}, Acc) -> + dict:append(Node, Name, Acc) + end, + dict:new(), + Shards + ), + lists:map( + fun({Node, DbNames}) -> + Ref = rexi:cast(Node, {?MODULE, start_update_notifier, [DbNames]}), + #worker{ref = Ref, node = Node} + end, + dict:to_list(EndPointDict) + ). % rexi endpoint start_update_notifier(DbNames) -> @@ -132,41 +140,39 @@ wait_db_updated({Pid, Ref}) -> receive_results(Workers, Acc0, Timeout) -> Fun = fun handle_message/3, case rexi_utils:recv(Workers, #worker.ref, Fun, Acc0, infinity, Timeout) of - {timeout, #acc{state=updated}=Acc} -> - receive_results(Workers, Acc, Timeout); - {timeout, #acc{state=waiting}=Acc} -> - erlang:send(Acc#acc.parent, {state, self(), timeout}), - receive_results(Workers, Acc#acc{state=unset}, Timeout); - {timeout, Acc} -> - receive_results(Workers, Acc#acc{state=timeout}, Timeout); - {_, Acc} -> - {ok, Acc} + {timeout, #acc{state = updated} = Acc} -> + receive_results(Workers, Acc, Timeout); + {timeout, #acc{state = waiting} = Acc} -> + erlang:send(Acc#acc.parent, {state, self(), timeout}), + receive_results(Workers, Acc#acc{state = unset}, Timeout); + {timeout, Acc} -> + receive_results(Workers, Acc#acc{state = timeout}, Timeout); + {_, Acc} -> + {ok, Acc} end. - handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) -> handle_error(Node, {nodedown, Node}, Acc); handle_message({rexi_EXIT, _Reason}, Worker, Acc) -> handle_error(Worker#worker.node, {worker_exit, Worker}, Acc); handle_message({gen_event_EXIT, Node, Reason}, _Worker, Acc) -> handle_error(Node, {gen_event_EXIT, Node, Reason}, Acc); -handle_message(db_updated, _Worker, #acc{state=waiting}=Acc) -> +handle_message(db_updated, _Worker, #acc{state = waiting} = Acc) -> % propagate message to calling controller erlang:send(Acc#acc.parent, {state, self(), updated}), - {ok, Acc#acc{state=unset}}; + {ok, Acc#acc{state = unset}}; handle_message(db_updated, _Worker, Acc) -> - {ok, Acc#acc{state=updated}}; + {ok, Acc#acc{state = updated}}; handle_message(db_deleted, _Worker, _Acc) -> {stop, ok}; -handle_message(get_state, _Worker, #acc{state=unset}=Acc) -> - {ok, Acc#acc{state=waiting}}; +handle_message(get_state, _Worker, #acc{state = unset} = Acc) -> + {ok, Acc#acc{state = waiting}}; handle_message(get_state, _Worker, Acc) -> erlang:send(Acc#acc.parent, {state, self(), Acc#acc.state}), - {ok, Acc#acc{state=unset}}; + {ok, Acc#acc{state = unset}}; handle_message(done, _, _) -> {stop, ok}. - handle_error(Node, Reason, #acc{shards = Shards} = Acc) -> Rest = lists:filter(fun(#shard{node = N}) -> N /= Node end, Shards), case fabric_ring:is_progress_possible([{R, nil} || R <- Rest]) of |