summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Bridgen <mikeb@rabbitmq.com>2011-02-09 17:52:18 +0000
committerMichael Bridgen <mikeb@rabbitmq.com>2011-02-09 17:52:18 +0000
commitc05eadffe1a8a9141e669bd169b611adb0097f2f (patch)
treeb108a8da2a3d59a4ebcabfbff31ccfb9a46eb174
parent0e9717aaf780e7cc37ff07da68d6c880a32950e4 (diff)
downloadrabbitmq-server-c05eadffe1a8a9141e669bd169b611adb0097f2f.tar.gz
Separate calculating the credit from updating the state, and do it properly.
-rw-r--r--src/rabbit_limiter.erl34
1 files changed, 22 insertions, 12 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 6798f7e5..a9ecf2e0 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -162,7 +162,7 @@ handle_call(unblock, _From, State) ->
maybe_notify_reply(irrelevant, State, State#lim{blocked = false});
handle_call({set_credit, CTag, Credit, Count, Drain}, _From, State) ->
- maybe_notify_reply(CTag, State, update_credit(CTag, Credit, Count, Drain, State));
+ maybe_notify_reply(CTag, State, reset_credit(CTag, Credit, Count, Drain, State));
handle_call(is_blocked, _From, State) ->
{reply, blocked(State), State}.
@@ -244,21 +244,31 @@ send_drained(ChPid, CTag, Count) ->
available = 0,
drain = true}).
+%% Assert the credit state. The count may not match ours, in which
+%% case we must rebase the credit.
+%% TODO Edge case: if the queue has nothing in it, and drain is set,
+%% we want to send a basic.credit back.
+reset_credit(CTag, Credit0, Count0, Drain, State = #lim{credits = Credits}) ->
+ Count =
+ case dict:find(CTag, Credits) of
+ {ok, #credit{ count = LocalCount }} ->
+ LocalCount;
+ _ -> Count0
+ end,
+ %% Our credit may have been reduced while messages are
+ %% in flight, so we bottom out at 0.
+ Credit = erlang:max(0, Count0 + Credit0 - Count),
+ update_credit(CTag, Credit, Count, Drain, State).
+
+%% Store the credit
update_credit(CTag, -1, _Count, _Drain, State = #lim{credits = Credits}) ->
State#lim{credits = dict:erase(CTag, Credits)};
-%% Edge case: if the queue has nothing in it, and drain is set, we want to
-%% send a credit.
update_credit(CTag, Credit, Count, Drain, State = #lim{credits = Credits}) ->
- New = case dict:find(CTag, Credits) of
- #credit{ count = OldCount,
- credit = OldCredit } = Old ->
- Old#credit{ credit = erlang:max(
- 0, OldCount + OldCredit - Count),
- count = Count, drain = Drain };
- _ -> #credit{ count = Count, credit = Credit, drain = Drain }
- end,
- State#lim{credits = dict:store(CTag, New, Credits)}.
+ State#lim{credits = dict:store(CTag,
+ #credit{credit = Credit,
+ count = Count,
+ drain = Drain}, Credits)}.
blocked(#lim{blocked = Blocked}) -> Blocked.