diff options
author | Adam Kocoloski <adam@cloudant.com> | 2014-06-03 14:25:59 -0400 |
---|---|---|
committer | Robert Newson <rnewson@apache.org> | 2014-07-23 18:08:01 +0100 |
commit | cd07cb8c04df1510253718df7f63d6783e3ec0a7 (patch) | |
tree | e0b58093a0d4ccb30fd2d5d5eeb876d75af95001 | |
parent | 23cda378322851e8d09c850510ed75119cc83adf (diff) | |
download | couchdb-cd07cb8c04df1510253718df7f63d6783e3ec0a7.tar.gz |
Configure buffer limit by message count
This allows an operator to decide how large the buffers should be. It
also provides an escape valve to clear the buffer entirely.
-rw-r--r-- | src/rexi_buffer.erl | 19 |
1 files changed, 12 insertions, 7 deletions
diff --git a/src/rexi_buffer.erl b/src/rexi_buffer.erl index 874ec3c87..f75399c10 100644 --- a/src/rexi_buffer.erl +++ b/src/rexi_buffer.erl @@ -29,9 +29,6 @@ count = 0 }). -%% TODO Leverage os_mon to discover available memory in the system --define (MAX_MEMORY, 17179869184). - start_link(ServerId) -> gen_server:start_link({local, ServerId}, ?MODULE, nil, []). @@ -41,7 +38,12 @@ send(Dest, Msg) -> init(_) -> - {ok, #state{}}. + %% TODO Leverage os_mon to discover available memory in the system + Max = list_to_integer(config:get("rexi", "buffer_count", "2000")), + {ok, #state{max_count = Max}}. + +handle_call(erase_buffer, _From, State) -> + {reply, ok, State#state{buffer = queue:new(), count = 0}, 0}; handle_call(get_buffered_count, _From, State) -> {reply, State#state.count, State, 0}. @@ -49,7 +51,7 @@ handle_call(get_buffered_count, _From, State) -> handle_cast({deliver, Dest, Msg}, #state{buffer = Q, count = C} = State) -> margaret_counter:increment([erlang, rexi, buffered]), Q2 = queue:in({Dest, Msg}, Q), - case should_drop() of + case should_drop(State) of true -> {noreply, State#state{buffer = queue:drop(Q2)}, 0}; false -> @@ -94,11 +96,14 @@ handle_info({'DOWN', Ref, _, Pid, _}, #state{sender = {Pid, Ref}} = State) -> terminate(_Reason, _State) -> ok. +code_change(_OldVsn, {state, Buffer, Sender, Count}, _Extra) -> + Max = list_to_integer(config:get("rexi", "buffer_count", "2000")), + {ok, #state{buffer=Buffer, sender=Sender, count=Count, max_count=Max}}; code_change(_OldVsn, State, _Extra) -> {ok, State}. -should_drop() -> - erlang:memory(total) > ?MAX_MEMORY. +should_drop(#state{count = Count, max_count = Max}) -> + Count >= Max. get_node({_, Node}) when is_atom(Node) -> Node; |