summaryrefslogtreecommitdiff
path: root/src/fabric/src/fabric_db_update_listener.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/fabric/src/fabric_db_update_listener.erl')
-rw-r--r--src/fabric/src/fabric_db_update_listener.erl68
1 files changed, 37 insertions, 31 deletions
diff --git a/src/fabric/src/fabric_db_update_listener.erl b/src/fabric/src/fabric_db_update_listener.erl
index fb2937be1..78ccf5a4d 100644
--- a/src/fabric/src/fabric_db_update_listener.erl
+++ b/src/fabric/src/fabric_db_update_listener.erl
@@ -45,18 +45,19 @@ go(Parent, ParentRef, DbName, Timeout) ->
%% This is not a common pattern for rexi but to enable the calling
%% process to communicate via handle_message/3 we "fake" it as a
%% a spawned worker.
- Workers = [#worker{ref=ParentRef, pid=Parent} | Notifiers],
+ Workers = [#worker{ref = ParentRef, pid = Parent} | Notifiers],
Acc = #acc{
parent = Parent,
state = unset,
shards = Shards
},
- Resp = try
- receive_results(Workers, Acc, Timeout)
- after
- rexi_monitor:stop(RexiMon),
- stop_cleanup_monitor(MonPid)
- end,
+ Resp =
+ try
+ receive_results(Workers, Acc, Timeout)
+ after
+ rexi_monitor:stop(RexiMon),
+ stop_cleanup_monitor(MonPid)
+ end,
case Resp of
{ok, _} -> ok;
{error, Error} -> erlang:error(Error);
@@ -64,13 +65,20 @@ go(Parent, ParentRef, DbName, Timeout) ->
end.
start_update_notifiers(Shards) ->
- EndPointDict = lists:foldl(fun(#shard{node=Node, name=Name}, Acc) ->
- dict:append(Node, Name, Acc)
- end, dict:new(), Shards),
- lists:map(fun({Node, DbNames}) ->
- Ref = rexi:cast(Node, {?MODULE, start_update_notifier, [DbNames]}),
- #worker{ref=Ref, node=Node}
- end, dict:to_list(EndPointDict)).
+ EndPointDict = lists:foldl(
+ fun(#shard{node = Node, name = Name}, Acc) ->
+ dict:append(Node, Name, Acc)
+ end,
+ dict:new(),
+ Shards
+ ),
+ lists:map(
+ fun({Node, DbNames}) ->
+ Ref = rexi:cast(Node, {?MODULE, start_update_notifier, [DbNames]}),
+ #worker{ref = Ref, node = Node}
+ end,
+ dict:to_list(EndPointDict)
+ ).
% rexi endpoint
start_update_notifier(DbNames) ->
@@ -132,41 +140,39 @@ wait_db_updated({Pid, Ref}) ->
receive_results(Workers, Acc0, Timeout) ->
Fun = fun handle_message/3,
case rexi_utils:recv(Workers, #worker.ref, Fun, Acc0, infinity, Timeout) of
- {timeout, #acc{state=updated}=Acc} ->
- receive_results(Workers, Acc, Timeout);
- {timeout, #acc{state=waiting}=Acc} ->
- erlang:send(Acc#acc.parent, {state, self(), timeout}),
- receive_results(Workers, Acc#acc{state=unset}, Timeout);
- {timeout, Acc} ->
- receive_results(Workers, Acc#acc{state=timeout}, Timeout);
- {_, Acc} ->
- {ok, Acc}
+ {timeout, #acc{state = updated} = Acc} ->
+ receive_results(Workers, Acc, Timeout);
+ {timeout, #acc{state = waiting} = Acc} ->
+ erlang:send(Acc#acc.parent, {state, self(), timeout}),
+ receive_results(Workers, Acc#acc{state = unset}, Timeout);
+ {timeout, Acc} ->
+ receive_results(Workers, Acc#acc{state = timeout}, Timeout);
+ {_, Acc} ->
+ {ok, Acc}
end.
-
handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) ->
handle_error(Node, {nodedown, Node}, Acc);
handle_message({rexi_EXIT, _Reason}, Worker, Acc) ->
handle_error(Worker#worker.node, {worker_exit, Worker}, Acc);
handle_message({gen_event_EXIT, Node, Reason}, _Worker, Acc) ->
handle_error(Node, {gen_event_EXIT, Node, Reason}, Acc);
-handle_message(db_updated, _Worker, #acc{state=waiting}=Acc) ->
+handle_message(db_updated, _Worker, #acc{state = waiting} = Acc) ->
% propagate message to calling controller
erlang:send(Acc#acc.parent, {state, self(), updated}),
- {ok, Acc#acc{state=unset}};
+ {ok, Acc#acc{state = unset}};
handle_message(db_updated, _Worker, Acc) ->
- {ok, Acc#acc{state=updated}};
+ {ok, Acc#acc{state = updated}};
handle_message(db_deleted, _Worker, _Acc) ->
{stop, ok};
-handle_message(get_state, _Worker, #acc{state=unset}=Acc) ->
- {ok, Acc#acc{state=waiting}};
+handle_message(get_state, _Worker, #acc{state = unset} = Acc) ->
+ {ok, Acc#acc{state = waiting}};
handle_message(get_state, _Worker, Acc) ->
erlang:send(Acc#acc.parent, {state, self(), Acc#acc.state}),
- {ok, Acc#acc{state=unset}};
+ {ok, Acc#acc{state = unset}};
handle_message(done, _, _) ->
{stop, ok}.
-
handle_error(Node, Reason, #acc{shards = Shards} = Acc) ->
Rest = lists:filter(fun(#shard{node = N}) -> N /= Node end, Shards),
case fabric_ring:is_progress_possible([{R, nil} || R <- Rest]) of