diff options
Diffstat (limited to 'src/mem3/src/mem3_nodes.erl')
-rw-r--r-- | src/mem3/src/mem3_nodes.erl | 104 |
1 files changed, 63 insertions, 41 deletions
diff --git a/src/mem3/src/mem3_nodes.erl b/src/mem3/src/mem3_nodes.erl index dd5be1a72..b46b3bb64 100644 --- a/src/mem3/src/mem3_nodes.erl +++ b/src/mem3/src/mem3_nodes.erl @@ -13,8 +13,14 @@ -module(mem3_nodes). -behaviour(gen_server). -vsn(1). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). -export([start_link/0, get_nodelist/0, get_node_info/2]). @@ -28,16 +34,18 @@ start_link() -> get_nodelist() -> try - lists:sort([N || {N,_} <- ets:tab2list(?MODULE)]) - catch error:badarg -> - gen_server:call(?MODULE, get_nodelist) + lists:sort([N || {N, _} <- ets:tab2list(?MODULE)]) + catch + error:badarg -> + gen_server:call(?MODULE, get_nodelist) end. get_node_info(Node, Key) -> try couch_util:get_value(Key, ets:lookup_element(?MODULE, Node, 2)) - catch error:badarg -> - gen_server:call(?MODULE, {get_node_info, Node, Key}) + catch + error:badarg -> + gen_server:call(?MODULE, {get_node_info, Node, Key}) end. init([]) -> @@ -47,13 +55,15 @@ init([]) -> {ok, #state{changes_pid = Pid, update_seq = UpdateSeq}}. handle_call(get_nodelist, _From, State) -> - {reply, lists:sort([N || {N,_} <- ets:tab2list(?MODULE)]), State}; + {reply, lists:sort([N || {N, _} <- ets:tab2list(?MODULE)]), State}; handle_call({get_node_info, Node, Key}, _From, State) -> - Resp = try - couch_util:get_value(Key, ets:lookup_element(?MODULE, Node, 2)) - catch error:badarg -> - error - end, + Resp = + try + couch_util:get_value(Key, ets:lookup_element(?MODULE, Node, 2)) + catch + error:badarg -> + error + end, {reply, Resp, State}; handle_call({add_node, Node, NodeInfo}, _From, State) -> gen_event:notify(mem3_events, {add_node, Node}), @@ -69,22 +79,26 @@ handle_call(_Call, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) -> +handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid = Pid} = State) -> couch_log:notice("~p changes listener died ~p", [?MODULE, Reason]), StartSeq = State#state.update_seq, - Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> StartSeq end, + Seq = + case Reason of + {seq, EndSeq} -> EndSeq; + _ -> StartSeq + end, erlang:send_after(5000, self(), start_listener), {noreply, State#state{update_seq = Seq}}; handle_info(start_listener, #state{update_seq = Seq} = State) -> {NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end), - {noreply, State#state{changes_pid=NewPid}}; + {noreply, State#state{changes_pid = NewPid}}; handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, _State) -> ok. -code_change(_OldVsn, #state{}=State, _Extra) -> +code_change(_OldVsn, #state{} = State, _Extra) -> {ok, State}. %% internal functions @@ -100,10 +114,10 @@ initialize_nodelist() -> first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) -> {ok, Acc}; -first_fold(#full_doc_info{deleted=true}, Acc) -> +first_fold(#full_doc_info{deleted = true}, Acc) -> {ok, Acc}; -first_fold(#full_doc_info{id=Id}=DocInfo, Db) -> - {ok, #doc{body={Props}}} = couch_db:open_doc(Db, DocInfo, [ejson_body]), +first_fold(#full_doc_info{id = Id} = DocInfo, Db) -> + {ok, #doc{body = {Props}}} = couch_db:open_doc(Db, DocInfo, [ejson_body]), ets:insert(?MODULE, {mem3_util:to_atom(Id), Props}), {ok, Db}. @@ -125,31 +139,39 @@ changes_callback({stop, EndSeq}, _) -> exit({seq, EndSeq}); changes_callback({change, {Change}, _}, _) -> Node = couch_util:get_value(<<"id">>, Change), - case Node of <<"_design/", _/binary>> -> ok; _ -> - case mem3_util:is_deleted(Change) of - false -> - {Props} = couch_util:get_value(doc, Change), - gen_server:call(?MODULE, {add_node, mem3_util:to_atom(Node), Props}); - true -> - gen_server:call(?MODULE, {remove_node, mem3_util:to_atom(Node)}) - end + case Node of + <<"_design/", _/binary>> -> + ok; + _ -> + case mem3_util:is_deleted(Change) of + false -> + {Props} = couch_util:get_value(doc, Change), + gen_server:call(?MODULE, {add_node, mem3_util:to_atom(Node), Props}); + true -> + gen_server:call(?MODULE, {remove_node, mem3_util:to_atom(Node)}) + end end, {ok, couch_util:get_value(<<"seq">>, Change)}; changes_callback(timeout, _) -> {ok, nil}. insert_if_missing(Db, Nodes) -> - Docs = lists:foldl(fun(Node, Acc) -> - case ets:lookup(?MODULE, Node) of - [_] -> - Acc; - [] -> - ets:insert(?MODULE, {Node, []}), - [#doc{id = couch_util:to_binary(Node)} | Acc] - end - end, [], Nodes), - if Docs =/= [] -> - {ok, _} = couch_db:update_docs(Db, Docs, []); - true -> - {ok, []} + Docs = lists:foldl( + fun(Node, Acc) -> + case ets:lookup(?MODULE, Node) of + [_] -> + Acc; + [] -> + ets:insert(?MODULE, {Node, []}), + [#doc{id = couch_util:to_binary(Node)} | Acc] + end + end, + [], + Nodes + ), + if + Docs =/= [] -> + {ok, _} = couch_db:update_docs(Db, Docs, []); + true -> + {ok, []} end. |