diff options
author | Adam Kocoloski <adam@cloudant.com> | 2013-10-15 22:46:52 -0400 |
---|---|---|
committer | Robert Newson <rnewson@apache.org> | 2014-07-23 18:02:32 +0100 |
commit | 1030906b85de7a398baf6363b9d3f6f21e93b257 (patch) | |
tree | 6924e82ab85e1d1b7999556bbf9e00da842d4cb6 | |
parent | 11b3859aae49c1a575cd70e401c9add9e578ca6b (diff) | |
download | couchdb-1030906b85de7a398baf6363b9d3f6f21e93b257.tar.gz |
Rejigger the governor implementation
Previously we'd spawn a number of processes to send messages to
'noconnect' / 'nosuspend' nodes. Now we're buffering the messages that
need to be sent directly in each governor and sending them one at a
time. This prevents the net_kernel from tipping over in the noconnect
case. We also decide whether to drop messages based on memory
consumption in the node instead of process limits (since we're not
spawning anymore).
BugzID: 23717
BugzID: 23718
-rw-r--r-- | src/rexi_governor.erl | 97 | ||||
-rw-r--r-- | src/rexi_utils.erl | 8 |
2 files changed, 62 insertions, 43 deletions
diff --git a/src/rexi_governor.erl b/src/rexi_governor.erl index 876165d29..fdf8c93b2 100644 --- a/src/rexi_governor.erl +++ b/src/rexi_governor.erl @@ -18,54 +18,67 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {pids = ets:new(pids, [set]), - spawn_max = 10000, - spawn_cnt = 0, - drop_cnt = 0}). - -init([PidSpawnMax]) -> - {ok, #state{spawn_max = PidSpawnMax}}. - -handle_call(_Request, _From, State) -> - Reply = ok, - {reply, Reply, State}. - -handle_cast({spawn_and_track, Dest, Msg}, - #state{pids = Pids, - spawn_max = SpawnMax, - spawn_cnt = SC, - drop_cnt = DC} = State) -> - {NewSC, NewDC} = - case ets:info(Pids, size) < SpawnMax of - true -> - {Pid, Ref} = spawn_monitor(erlang, send, [Dest, Msg]), - ets:insert(Pids, {Pid, Ref}), - {SC + 1, DC}; - false -> - % drop message on floor - {SC, DC + 1} - end, - {noreply, State#state{spawn_cnt = NewSC, drop_cnt = NewDC}}; +-export ([ + send/2, + start_link/1 +]). + +-record(state, { + buffer = queue:new(), + count = 0 +}). -handle_cast(nodeout, #state{pids = Pids} = State) -> - % kill all the pids - ets:foldl(fun({P, _Ref}, Acc) -> - exit(P, kill), - Acc - end, [], Pids), - ets:delete_all_objects(Pids), - {noreply, State}. +%% TODO Leverage os_mon to discover available memory in the system +-define (MAX_MEMORY, 17179869184). -handle_info({'DOWN', _, process, Pid, normal}, - #state{pids = Pids} = State) -> - ets:delete(Pids, Pid), - {noreply, State}; +start_link(ServerId) -> + gen_server:start_link({local, ServerId}, ?MODULE, nil, []). -handle_info({'DOWN', _, process, _Pid, killed}, State) -> - {noreply, State}. +send(Dest, Msg) -> + Server = list_to_atom(lists:concat([rexi_governor, "_", get_node(Dest)])), + gen_server:cast(Server, {deliver, Dest, Msg}). + + +init(_) -> + {ok, #state{}}. + +handle_call(get_buffered_count, _From, State) -> + {reply, State#state.count, State, 0}. + +handle_cast({deliver, Dest, Msg}, #state{buffer = Q, count = C} = State) -> + margaret_counter:increment([erlang, rexi, buffered]), + Q2 = queue:in({Dest, Msg}, Q), + case should_drop() of + true -> + {noreply, State#state{buffer = queue:drop(Q2)}, 0}; + false -> + {noreply, State#state{buffer = Q2, count = C+1}, 0} + end. + +handle_info(timeout, State) -> + #state{buffer = Q, count = C} = State, + case queue:out_r(Q) of + {{value, {Dest, Msg}}, Q2} -> + erlang:send(Dest, Msg); + {empty, Q2} -> + ok + end, + if C > 1 -> + {noreply, State#state{buffer = Q2, count = C-1}, 0}; + true -> + {noreply, State#state{buffer = Q2, count = 0}} + end. terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. + +should_drop() -> + erlang:memory(total) > ?MAX_MEMORY. + +get_node({_, Node}) when is_atom(Node) -> + Node; +get_node(Pid) when is_pid(Pid) -> + node(Pid). diff --git a/src/rexi_utils.erl b/src/rexi_utils.erl index 6e037571c..3c89ca94e 100644 --- a/src/rexi_utils.erl +++ b/src/rexi_utils.erl @@ -29,7 +29,13 @@ server_pid(Node) -> %% @doc send a message as quickly as possible send(Dest, Msg) -> - rexi_gov_manager:send(Dest, Msg). + case erlang:send(Dest, Msg, [noconnect, nosuspend]) of + ok -> + ok; + _ -> + % treat nosuspend and noconnect the same + rexi_governor:send(Dest, Msg) + end. %% @doc set up the receive loop with an overall timeout -spec recv([any()], integer(), function(), any(), timeout(), timeout()) -> |