summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHubert Plociniczak <hubert@lshift.net>2009-06-10 14:24:10 +0100
committerHubert Plociniczak <hubert@lshift.net>2009-06-10 14:24:10 +0100
commit909bfbe81055ed684aba44be52066637393c2293 (patch)
tree8e3ebaaf4df079f2ef8cdaa5579dd512bb576601
parent347d7f4b57e590c310168ea328adaa3d0dd05e6f (diff)
parent77eb7ad7c6daa1313d26e84e801bb8c5170dd78c (diff)
downloadrabbitmq-server-909bfbe81055ed684aba44be52066637393c2293.tar.gz
Merge bug20801 into default
-rw-r--r--src/rabbit_amqqueue_process.erl18
1 files changed, 10 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7ffb1c8f..41c2d101 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -148,7 +148,7 @@ all_ch_record() ->
[C || {{ch, _}, C} <- get()].
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
- Limited orelse Count > ?UNSENT_MESSAGE_LIMIT.
+ Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
ch_record_state_transition(OldCR, NewCR) ->
BlockedOld = is_ch_blocked(OldCR),
@@ -630,7 +630,8 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
reply({error, exclusive_consume_unavailable}, State);
ok ->
C = #cr{consumers = Consumers} = ch_record(ChPid),
- Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
+ Consumer = #consumer{tag = ConsumerTag,
+ ack_required = not(NoAck)},
store_ch_record(C#cr{consumers = [Consumer | Consumers],
limiter_pid = LimiterPid}),
if Consumers == [] ->
@@ -638,13 +639,14 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
true ->
ok
end,
+ ExclusiveConsumer =
+ if ExclusiveConsume -> {ChPid, ConsumerTag};
+ true -> ExistingHolder
+ end,
State1 = State#q{has_had_consumers = true,
- exclusive_consumer =
- if
- ExclusiveConsume -> {ChPid, ConsumerTag};
- true -> ExistingHolder
- end,
- round_robin = queue:in({ChPid, Consumer}, RoundRobin)},
+ exclusive_consumer = ExclusiveConsumer,
+ round_robin = queue:in({ChPid, Consumer},
+ RoundRobin)},
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, run_poke_burst(State1))
end