summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2014-06-03 14:25:59 -0400
committerRobert Newson <rnewson@apache.org>2014-07-23 18:08:01 +0100
commitcd07cb8c04df1510253718df7f63d6783e3ec0a7 (patch)
treee0b58093a0d4ccb30fd2d5d5eeb876d75af95001
parent23cda378322851e8d09c850510ed75119cc83adf (diff)
downloadcouchdb-cd07cb8c04df1510253718df7f63d6783e3ec0a7.tar.gz
Configure buffer limit by message count
This allows an operator to decide how large the buffers should be. It also provides an escape valve to clear the buffer entirely.
-rw-r--r--src/rexi_buffer.erl19
1 files changed, 12 insertions, 7 deletions
diff --git a/src/rexi_buffer.erl b/src/rexi_buffer.erl
index 874ec3c87..f75399c10 100644
--- a/src/rexi_buffer.erl
+++ b/src/rexi_buffer.erl
@@ -29,9 +29,6 @@
count = 0
}).
-%% TODO Leverage os_mon to discover available memory in the system
--define (MAX_MEMORY, 17179869184).
-
start_link(ServerId) ->
gen_server:start_link({local, ServerId}, ?MODULE, nil, []).
@@ -41,7 +38,12 @@ send(Dest, Msg) ->
init(_) ->
- {ok, #state{}}.
+ %% TODO Leverage os_mon to discover available memory in the system
+ Max = list_to_integer(config:get("rexi", "buffer_count", "2000")),
+ {ok, #state{max_count = Max}}.
+
+handle_call(erase_buffer, _From, State) ->
+ {reply, ok, State#state{buffer = queue:new(), count = 0}, 0};
handle_call(get_buffered_count, _From, State) ->
{reply, State#state.count, State, 0}.
@@ -49,7 +51,7 @@ handle_call(get_buffered_count, _From, State) ->
handle_cast({deliver, Dest, Msg}, #state{buffer = Q, count = C} = State) ->
margaret_counter:increment([erlang, rexi, buffered]),
Q2 = queue:in({Dest, Msg}, Q),
- case should_drop() of
+ case should_drop(State) of
true ->
{noreply, State#state{buffer = queue:drop(Q2)}, 0};
false ->
@@ -94,11 +96,14 @@ handle_info({'DOWN', Ref, _, Pid, _}, #state{sender = {Pid, Ref}} = State) ->
terminate(_Reason, _State) ->
ok.
+code_change(_OldVsn, {state, Buffer, Sender, Count}, _Extra) ->
+ Max = list_to_integer(config:get("rexi", "buffer_count", "2000")),
+ {ok, #state{buffer=Buffer, sender=Sender, count=Count, max_count=Max}};
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-should_drop() ->
- erlang:memory(total) > ?MAX_MEMORY.
+should_drop(#state{count = Count, max_count = Max}) ->
+ Count >= Max.
get_node({_, Node}) when is_atom(Node) ->
Node;