summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2013-10-15 22:46:52 -0400
committerRobert Newson <rnewson@apache.org>2014-07-23 18:02:32 +0100
commit1030906b85de7a398baf6363b9d3f6f21e93b257 (patch)
tree6924e82ab85e1d1b7999556bbf9e00da842d4cb6
parent11b3859aae49c1a575cd70e401c9add9e578ca6b (diff)
downloadcouchdb-1030906b85de7a398baf6363b9d3f6f21e93b257.tar.gz
Rejigger the governor implementation
Previously we'd spawn a number of processes to send messages to 'noconnect' / 'nosuspend' nodes. Now we're buffering the messages that need to be sent directly in each governor and sending them one at a time. This prevents the net_kernel from tipping over in the noconnect case. We also decide whether to drop messages based on memory consumption in the node instead of process limits (since we're not spawning anymore). BugzID: 23717 BugzID: 23718
-rw-r--r--src/rexi_governor.erl97
-rw-r--r--src/rexi_utils.erl8
2 files changed, 62 insertions, 43 deletions
diff --git a/src/rexi_governor.erl b/src/rexi_governor.erl
index 876165d29..fdf8c93b2 100644
--- a/src/rexi_governor.erl
+++ b/src/rexi_governor.erl
@@ -18,54 +18,67 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {pids = ets:new(pids, [set]),
- spawn_max = 10000,
- spawn_cnt = 0,
- drop_cnt = 0}).
-
-init([PidSpawnMax]) ->
- {ok, #state{spawn_max = PidSpawnMax}}.
-
-handle_call(_Request, _From, State) ->
- Reply = ok,
- {reply, Reply, State}.
-
-handle_cast({spawn_and_track, Dest, Msg},
- #state{pids = Pids,
- spawn_max = SpawnMax,
- spawn_cnt = SC,
- drop_cnt = DC} = State) ->
- {NewSC, NewDC} =
- case ets:info(Pids, size) < SpawnMax of
- true ->
- {Pid, Ref} = spawn_monitor(erlang, send, [Dest, Msg]),
- ets:insert(Pids, {Pid, Ref}),
- {SC + 1, DC};
- false ->
- % drop message on floor
- {SC, DC + 1}
- end,
- {noreply, State#state{spawn_cnt = NewSC, drop_cnt = NewDC}};
+-export ([
+ send/2,
+ start_link/1
+]).
+
+-record(state, {
+ buffer = queue:new(),
+ count = 0
+}).
-handle_cast(nodeout, #state{pids = Pids} = State) ->
- % kill all the pids
- ets:foldl(fun({P, _Ref}, Acc) ->
- exit(P, kill),
- Acc
- end, [], Pids),
- ets:delete_all_objects(Pids),
- {noreply, State}.
+%% TODO Leverage os_mon to discover available memory in the system
+-define (MAX_MEMORY, 17179869184).
-handle_info({'DOWN', _, process, Pid, normal},
- #state{pids = Pids} = State) ->
- ets:delete(Pids, Pid),
- {noreply, State};
+start_link(ServerId) ->
+ gen_server:start_link({local, ServerId}, ?MODULE, nil, []).
-handle_info({'DOWN', _, process, _Pid, killed}, State) ->
- {noreply, State}.
+send(Dest, Msg) ->
+ Server = list_to_atom(lists:concat([rexi_governor, "_", get_node(Dest)])),
+ gen_server:cast(Server, {deliver, Dest, Msg}).
+
+
+init(_) ->
+ {ok, #state{}}.
+
+handle_call(get_buffered_count, _From, State) ->
+ {reply, State#state.count, State, 0}.
+
+handle_cast({deliver, Dest, Msg}, #state{buffer = Q, count = C} = State) ->
+ margaret_counter:increment([erlang, rexi, buffered]),
+ Q2 = queue:in({Dest, Msg}, Q),
+ case should_drop() of
+ true ->
+ {noreply, State#state{buffer = queue:drop(Q2)}, 0};
+ false ->
+ {noreply, State#state{buffer = Q2, count = C+1}, 0}
+ end.
+
+handle_info(timeout, State) ->
+ #state{buffer = Q, count = C} = State,
+ case queue:out_r(Q) of
+ {{value, {Dest, Msg}}, Q2} ->
+ erlang:send(Dest, Msg);
+ {empty, Q2} ->
+ ok
+ end,
+ if C > 1 ->
+ {noreply, State#state{buffer = Q2, count = C-1}, 0};
+ true ->
+ {noreply, State#state{buffer = Q2, count = 0}}
+ end.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+
+should_drop() ->
+ erlang:memory(total) > ?MAX_MEMORY.
+
+get_node({_, Node}) when is_atom(Node) ->
+ Node;
+get_node(Pid) when is_pid(Pid) ->
+ node(Pid).
diff --git a/src/rexi_utils.erl b/src/rexi_utils.erl
index 6e037571c..3c89ca94e 100644
--- a/src/rexi_utils.erl
+++ b/src/rexi_utils.erl
@@ -29,7 +29,13 @@ server_pid(Node) ->
%% @doc send a message as quickly as possible
send(Dest, Msg) ->
- rexi_gov_manager:send(Dest, Msg).
+ case erlang:send(Dest, Msg, [noconnect, nosuspend]) of
+ ok ->
+ ok;
+ _ ->
+ % treat nosuspend and noconnect the same
+ rexi_governor:send(Dest, Msg)
+ end.
%% @doc set up the receive loop with an overall timeout
-spec recv([any()], integer(), function(), any(), timeout(), timeout()) ->