summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAdam Kocoloski <adam@cloudant.com>2014-06-03 14:28:37 -0400
committerRobert Newson <rnewson@apache.org>2014-07-23 18:08:14 +0100
commit455101492947939cb7767c6bd888baf2e33ad497 (patch)
treed39aff25cc480acfcbaed730cab3c716dc3d360f
parentcd07cb8c04df1510253718df7f63d6783e3ec0a7 (diff)
downloadcouchdb-455101492947939cb7767c6bd888baf2e33ad497.tar.gz
Fix counting bug in buffer
Quoting @davisp: There's a bug in rexi_buffer that can lead to the counter in its state running negative. I noticed this on malort looking for memory usage. A quick reading of the code suggests its due to us getting a timeout message with an empty queue. Theoretically this could happen if we've exceeded the MAX_MEMORY threshold when sending a message or even with just grabbing the buffered count when its idle. BugzID: 28049
-rw-r--r--src/rexi_buffer.erl46
1 files changed, 17 insertions, 29 deletions
diff --git a/src/rexi_buffer.erl b/src/rexi_buffer.erl
index f75399c10..26f3c9792 100644
--- a/src/rexi_buffer.erl
+++ b/src/rexi_buffer.erl
@@ -58,37 +58,25 @@ handle_cast({deliver, Dest, Msg}, #state{buffer = Q, count = C} = State) ->
{noreply, State#state{buffer = Q2, count = C+1}, 0}
end.
-handle_info(timeout, #state{sender = nil} = State) ->
+handle_info(timeout, #state{sender = nil, buffer = {[],[]}, count = 0}=State) ->
+ {noreply, State};
+handle_info(timeout, #state{sender = nil, count = C} = State) when C > 0 ->
#state{buffer = Q, count = C} = State,
- Sender = case queue:out_r(Q) of
- {{value, {Dest, Msg}}, Q2} ->
- case erlang:send(Dest, Msg, [noconnect, nosuspend]) of
- ok ->
- nil;
- _Else ->
- spawn_monitor(erlang, send, [Dest, Msg])
- end;
- {empty, Q2} ->
- nil
- end,
- if Sender =:= nil, C > 1 ->
- {noreply, State#state{buffer = Q2, count = C-1}, 0};
- true ->
- NewState = State#state{buffer = Q2, sender = Sender, count = C-1},
- % When Sender is nil and C-1 == 0 we're reverting to an
- % idle state with no outstanding or queued messages. We'll
- % use this oppurtunity to hibernate this process and
- % run a garbage collection.
- case {Sender, C-1} of
- {nil, 0} ->
- {noreply, NewState, hibernate};
- _ ->
- {noreply, NewState, infinity}
- end
+ {{value, {Dest, Msg}}, Q2} = queue:out_r(Q),
+ NewState = State#state{buffer = Q2, count = C-1},
+ case erlang:send(Dest, Msg, [noconnect, nosuspend]) of
+ ok when C =:= 1 ->
+ % We just sent the last queued messsage, we'll use this opportunity
+ % to hibernate the process and run a garbage collection
+ {noreply, NewState, hibernate};
+ ok when C > 1 ->
+ % Use a zero timeout to recurse into this handler ASAP
+ {noreply, NewState, 0};
+ _Else ->
+ % We're experiencing delays, keep buffering internally
+ Sender = spawn_monitor(erlang, send, [Dest, Msg]),
+ {noreply, NewState#state{sender = Sender}}
end;
-handle_info(timeout, State) ->
- % Waiting on a sender to return
- {noreply, State};
handle_info({'DOWN', Ref, _, Pid, _}, #state{sender = {Pid, Ref}} = State) ->
{noreply, State#state{sender = nil}, 0}.