summaryrefslogtreecommitdiff
path: root/src/mem3/src/mem3_nodes.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mem3/src/mem3_nodes.erl')
-rw-r--r--src/mem3/src/mem3_nodes.erl104
1 files changed, 63 insertions, 41 deletions
diff --git a/src/mem3/src/mem3_nodes.erl b/src/mem3/src/mem3_nodes.erl
index dd5be1a72..b46b3bb64 100644
--- a/src/mem3/src/mem3_nodes.erl
+++ b/src/mem3/src/mem3_nodes.erl
@@ -13,8 +13,14 @@
-module(mem3_nodes).
-behaviour(gen_server).
-vsn(1).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
+-export([
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3
+]).
-export([start_link/0, get_nodelist/0, get_node_info/2]).
@@ -28,16 +34,18 @@ start_link() ->
get_nodelist() ->
try
- lists:sort([N || {N,_} <- ets:tab2list(?MODULE)])
- catch error:badarg ->
- gen_server:call(?MODULE, get_nodelist)
+ lists:sort([N || {N, _} <- ets:tab2list(?MODULE)])
+ catch
+ error:badarg ->
+ gen_server:call(?MODULE, get_nodelist)
end.
get_node_info(Node, Key) ->
try
couch_util:get_value(Key, ets:lookup_element(?MODULE, Node, 2))
- catch error:badarg ->
- gen_server:call(?MODULE, {get_node_info, Node, Key})
+ catch
+ error:badarg ->
+ gen_server:call(?MODULE, {get_node_info, Node, Key})
end.
init([]) ->
@@ -47,13 +55,15 @@ init([]) ->
{ok, #state{changes_pid = Pid, update_seq = UpdateSeq}}.
handle_call(get_nodelist, _From, State) ->
- {reply, lists:sort([N || {N,_} <- ets:tab2list(?MODULE)]), State};
+ {reply, lists:sort([N || {N, _} <- ets:tab2list(?MODULE)]), State};
handle_call({get_node_info, Node, Key}, _From, State) ->
- Resp = try
- couch_util:get_value(Key, ets:lookup_element(?MODULE, Node, 2))
- catch error:badarg ->
- error
- end,
+ Resp =
+ try
+ couch_util:get_value(Key, ets:lookup_element(?MODULE, Node, 2))
+ catch
+ error:badarg ->
+ error
+ end,
{reply, Resp, State};
handle_call({add_node, Node, NodeInfo}, _From, State) ->
gen_event:notify(mem3_events, {add_node, Node}),
@@ -69,22 +79,26 @@ handle_call(_Call, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid=Pid} = State) ->
+handle_info({'DOWN', _, _, Pid, Reason}, #state{changes_pid = Pid} = State) ->
couch_log:notice("~p changes listener died ~p", [?MODULE, Reason]),
StartSeq = State#state.update_seq,
- Seq = case Reason of {seq, EndSeq} -> EndSeq; _ -> StartSeq end,
+ Seq =
+ case Reason of
+ {seq, EndSeq} -> EndSeq;
+ _ -> StartSeq
+ end,
erlang:send_after(5000, self(), start_listener),
{noreply, State#state{update_seq = Seq}};
handle_info(start_listener, #state{update_seq = Seq} = State) ->
{NewPid, _} = spawn_monitor(fun() -> listen_for_changes(Seq) end),
- {noreply, State#state{changes_pid=NewPid}};
+ {noreply, State#state{changes_pid = NewPid}};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
-code_change(_OldVsn, #state{}=State, _Extra) ->
+code_change(_OldVsn, #state{} = State, _Extra) ->
{ok, State}.
%% internal functions
@@ -100,10 +114,10 @@ initialize_nodelist() ->
first_fold(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) ->
{ok, Acc};
-first_fold(#full_doc_info{deleted=true}, Acc) ->
+first_fold(#full_doc_info{deleted = true}, Acc) ->
{ok, Acc};
-first_fold(#full_doc_info{id=Id}=DocInfo, Db) ->
- {ok, #doc{body={Props}}} = couch_db:open_doc(Db, DocInfo, [ejson_body]),
+first_fold(#full_doc_info{id = Id} = DocInfo, Db) ->
+ {ok, #doc{body = {Props}}} = couch_db:open_doc(Db, DocInfo, [ejson_body]),
ets:insert(?MODULE, {mem3_util:to_atom(Id), Props}),
{ok, Db}.
@@ -125,31 +139,39 @@ changes_callback({stop, EndSeq}, _) ->
exit({seq, EndSeq});
changes_callback({change, {Change}, _}, _) ->
Node = couch_util:get_value(<<"id">>, Change),
- case Node of <<"_design/", _/binary>> -> ok; _ ->
- case mem3_util:is_deleted(Change) of
- false ->
- {Props} = couch_util:get_value(doc, Change),
- gen_server:call(?MODULE, {add_node, mem3_util:to_atom(Node), Props});
- true ->
- gen_server:call(?MODULE, {remove_node, mem3_util:to_atom(Node)})
- end
+ case Node of
+ <<"_design/", _/binary>> ->
+ ok;
+ _ ->
+ case mem3_util:is_deleted(Change) of
+ false ->
+ {Props} = couch_util:get_value(doc, Change),
+ gen_server:call(?MODULE, {add_node, mem3_util:to_atom(Node), Props});
+ true ->
+ gen_server:call(?MODULE, {remove_node, mem3_util:to_atom(Node)})
+ end
end,
{ok, couch_util:get_value(<<"seq">>, Change)};
changes_callback(timeout, _) ->
{ok, nil}.
insert_if_missing(Db, Nodes) ->
- Docs = lists:foldl(fun(Node, Acc) ->
- case ets:lookup(?MODULE, Node) of
- [_] ->
- Acc;
- [] ->
- ets:insert(?MODULE, {Node, []}),
- [#doc{id = couch_util:to_binary(Node)} | Acc]
- end
- end, [], Nodes),
- if Docs =/= [] ->
- {ok, _} = couch_db:update_docs(Db, Docs, []);
- true ->
- {ok, []}
+ Docs = lists:foldl(
+ fun(Node, Acc) ->
+ case ets:lookup(?MODULE, Node) of
+ [_] ->
+ Acc;
+ [] ->
+ ets:insert(?MODULE, {Node, []}),
+ [#doc{id = couch_util:to_binary(Node)} | Acc]
+ end
+ end,
+ [],
+ Nodes
+ ),
+ if
+ Docs =/= [] ->
+ {ok, _} = couch_db:update_docs(Db, Docs, []);
+ true ->
+ {ok, []}
end.