diff options
Diffstat (limited to 'src/fabric/src/fabric_db_update_listener.erl')
-rw-r--r-- | src/fabric/src/fabric_db_update_listener.erl | 183 |
1 files changed, 0 insertions, 183 deletions
diff --git a/src/fabric/src/fabric_db_update_listener.erl b/src/fabric/src/fabric_db_update_listener.erl deleted file mode 100644 index 78ccf5a4d..000000000 --- a/src/fabric/src/fabric_db_update_listener.erl +++ /dev/null @@ -1,183 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(fabric_db_update_listener). - --export([go/4, start_update_notifier/1, stop/1, wait_db_updated/1]). --export([handle_db_event/3]). - --include_lib("fabric/include/fabric.hrl"). --include_lib("mem3/include/mem3.hrl"). - --record(worker, { - ref, - node, - pid -}). - --record(cb_state, { - client_pid, - client_ref, - notify -}). - --record(acc, { - parent, - state, - shards -}). - -go(Parent, ParentRef, DbName, Timeout) -> - Shards = mem3:shards(DbName), - Notifiers = start_update_notifiers(Shards), - MonRefs = lists:usort([rexi_utils:server_pid(N) || #worker{node = N} <- Notifiers]), - RexiMon = rexi_monitor:start(MonRefs), - MonPid = start_cleanup_monitor(self(), Notifiers), - %% This is not a common pattern for rexi but to enable the calling - %% process to communicate via handle_message/3 we "fake" it as a - %% a spawned worker. - Workers = [#worker{ref = ParentRef, pid = Parent} | Notifiers], - Acc = #acc{ - parent = Parent, - state = unset, - shards = Shards - }, - Resp = - try - receive_results(Workers, Acc, Timeout) - after - rexi_monitor:stop(RexiMon), - stop_cleanup_monitor(MonPid) - end, - case Resp of - {ok, _} -> ok; - {error, Error} -> erlang:error(Error); - Error -> erlang:error(Error) - end. - -start_update_notifiers(Shards) -> - EndPointDict = lists:foldl( - fun(#shard{node = Node, name = Name}, Acc) -> - dict:append(Node, Name, Acc) - end, - dict:new(), - Shards - ), - lists:map( - fun({Node, DbNames}) -> - Ref = rexi:cast(Node, {?MODULE, start_update_notifier, [DbNames]}), - #worker{ref = Ref, node = Node} - end, - dict:to_list(EndPointDict) - ). - -% rexi endpoint -start_update_notifier(DbNames) -> - {Caller, Ref} = get(rexi_from), - Notify = config:get("couchdb", "maintenance_mode", "false") /= "true", - State = #cb_state{client_pid = Caller, client_ref = Ref, notify = Notify}, - Options = [{parent, Caller}, {dbnames, DbNames}], - couch_event:listen(?MODULE, handle_db_event, State, Options). - -handle_db_event(_DbName, updated, #cb_state{notify = true} = St) -> - erlang:send(St#cb_state.client_pid, {St#cb_state.client_ref, db_updated}), - {ok, St}; -handle_db_event(_DbName, deleted, St) -> - erlang:send(St#cb_state.client_pid, {St#cb_state.client_ref, db_deleted}), - stop; -handle_db_event(_DbName, _Event, St) -> - {ok, St}. - -start_cleanup_monitor(Parent, Notifiers) -> - spawn(fun() -> - Ref = erlang:monitor(process, Parent), - cleanup_monitor(Parent, Ref, Notifiers) - end). - -stop_cleanup_monitor(MonPid) -> - MonPid ! {self(), stop}. - -cleanup_monitor(Parent, Ref, Notifiers) -> - receive - {'DOWN', Ref, _, _, _} -> - stop_update_notifiers(Notifiers); - {Parent, stop} -> - stop_update_notifiers(Notifiers); - Else -> - couch_log:error("Unkown message in ~w :: ~w", [?MODULE, Else]), - stop_update_notifiers(Notifiers), - exit(Parent, {unknown_message, Else}) - end. - -stop_update_notifiers(Notifiers) -> - rexi:kill_all([{N, Ref} || #worker{node = N, ref = Ref} <- Notifiers]). - -stop({Pid, Ref}) -> - erlang:send(Pid, {Ref, done}). - -wait_db_updated({Pid, Ref}) -> - MonRef = erlang:monitor(process, Pid), - erlang:send(Pid, {Ref, get_state}), - receive - {state, Pid, State} -> - erlang:demonitor(MonRef, [flush]), - State; - {'DOWN', MonRef, process, Pid, _Reason} -> - changes_feed_died - after 300000 -> - ?MODULE:wait_db_updated({Pid, Ref}) - end. - -receive_results(Workers, Acc0, Timeout) -> - Fun = fun handle_message/3, - case rexi_utils:recv(Workers, #worker.ref, Fun, Acc0, infinity, Timeout) of - {timeout, #acc{state = updated} = Acc} -> - receive_results(Workers, Acc, Timeout); - {timeout, #acc{state = waiting} = Acc} -> - erlang:send(Acc#acc.parent, {state, self(), timeout}), - receive_results(Workers, Acc#acc{state = unset}, Timeout); - {timeout, Acc} -> - receive_results(Workers, Acc#acc{state = timeout}, Timeout); - {_, Acc} -> - {ok, Acc} - end. - -handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) -> - handle_error(Node, {nodedown, Node}, Acc); -handle_message({rexi_EXIT, _Reason}, Worker, Acc) -> - handle_error(Worker#worker.node, {worker_exit, Worker}, Acc); -handle_message({gen_event_EXIT, Node, Reason}, _Worker, Acc) -> - handle_error(Node, {gen_event_EXIT, Node, Reason}, Acc); -handle_message(db_updated, _Worker, #acc{state = waiting} = Acc) -> - % propagate message to calling controller - erlang:send(Acc#acc.parent, {state, self(), updated}), - {ok, Acc#acc{state = unset}}; -handle_message(db_updated, _Worker, Acc) -> - {ok, Acc#acc{state = updated}}; -handle_message(db_deleted, _Worker, _Acc) -> - {stop, ok}; -handle_message(get_state, _Worker, #acc{state = unset} = Acc) -> - {ok, Acc#acc{state = waiting}}; -handle_message(get_state, _Worker, Acc) -> - erlang:send(Acc#acc.parent, {state, self(), Acc#acc.state}), - {ok, Acc#acc{state = unset}}; -handle_message(done, _, _) -> - {stop, ok}. - -handle_error(Node, Reason, #acc{shards = Shards} = Acc) -> - Rest = lists:filter(fun(#shard{node = N}) -> N /= Node end, Shards), - case fabric_ring:is_progress_possible([{R, nil} || R <- Rest]) of - true -> - {ok, Acc#acc{shards = Rest}}; - false -> - {error, Reason} - end. |