diff options
author | Michael Bridgen <mikeb@rabbitmq.com> | 2011-02-09 17:52:18 +0000 |
---|---|---|
committer | Michael Bridgen <mikeb@rabbitmq.com> | 2011-02-09 17:52:18 +0000 |
commit | c05eadffe1a8a9141e669bd169b611adb0097f2f (patch) | |
tree | b108a8da2a3d59a4ebcabfbff31ccfb9a46eb174 | |
parent | 0e9717aaf780e7cc37ff07da68d6c880a32950e4 (diff) | |
download | rabbitmq-server-c05eadffe1a8a9141e669bd169b611adb0097f2f.tar.gz |
Separate calculating the credit from updating the state, and do it properly.
-rw-r--r-- | src/rabbit_limiter.erl | 34 |
1 files changed, 22 insertions, 12 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 6798f7e5..a9ecf2e0 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -162,7 +162,7 @@ handle_call(unblock, _From, State) -> maybe_notify_reply(irrelevant, State, State#lim{blocked = false}); handle_call({set_credit, CTag, Credit, Count, Drain}, _From, State) -> - maybe_notify_reply(CTag, State, update_credit(CTag, Credit, Count, Drain, State)); + maybe_notify_reply(CTag, State, reset_credit(CTag, Credit, Count, Drain, State)); handle_call(is_blocked, _From, State) -> {reply, blocked(State), State}. @@ -244,21 +244,31 @@ send_drained(ChPid, CTag, Count) -> available = 0, drain = true}). +%% Assert the credit state. The count may not match ours, in which +%% case we must rebase the credit. +%% TODO Edge case: if the queue has nothing in it, and drain is set, +%% we want to send a basic.credit back. +reset_credit(CTag, Credit0, Count0, Drain, State = #lim{credits = Credits}) -> + Count = + case dict:find(CTag, Credits) of + {ok, #credit{ count = LocalCount }} -> + LocalCount; + _ -> Count0 + end, + %% Our credit may have been reduced while messages are + %% in flight, so we bottom out at 0. + Credit = erlang:max(0, Count0 + Credit0 - Count), + update_credit(CTag, Credit, Count, Drain, State). + +%% Store the credit update_credit(CTag, -1, _Count, _Drain, State = #lim{credits = Credits}) -> State#lim{credits = dict:erase(CTag, Credits)}; -%% Edge case: if the queue has nothing in it, and drain is set, we want to -%% send a credit. update_credit(CTag, Credit, Count, Drain, State = #lim{credits = Credits}) -> - New = case dict:find(CTag, Credits) of - #credit{ count = OldCount, - credit = OldCredit } = Old -> - Old#credit{ credit = erlang:max( - 0, OldCount + OldCredit - Count), - count = Count, drain = Drain }; - _ -> #credit{ count = Count, credit = Credit, drain = Drain } - end, - State#lim{credits = dict:store(CTag, New, Credits)}. + State#lim{credits = dict:store(CTag, + #credit{credit = Credit, + count = Count, + drain = Drain}, Credits)}. blocked(#lim{blocked = Blocked}) -> Blocked. |