diff options
author | Hubert Plociniczak <hubert@lshift.net> | 2009-06-10 14:24:10 +0100 |
---|---|---|
committer | Hubert Plociniczak <hubert@lshift.net> | 2009-06-10 14:24:10 +0100 |
commit | 909bfbe81055ed684aba44be52066637393c2293 (patch) | |
tree | 8e3ebaaf4df079f2ef8cdaa5579dd512bb576601 | |
parent | 347d7f4b57e590c310168ea328adaa3d0dd05e6f (diff) | |
parent | 77eb7ad7c6daa1313d26e84e801bb8c5170dd78c (diff) | |
download | rabbitmq-server-909bfbe81055ed684aba44be52066637393c2293.tar.gz |
Merge bug20801 into default
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 18 |
1 files changed, 10 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7ffb1c8f..41c2d101 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -148,7 +148,7 @@ all_ch_record() -> [C || {{ch, _}, C} <- get()]. is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> - Limited orelse Count > ?UNSENT_MESSAGE_LIMIT. + Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. ch_record_state_transition(OldCR, NewCR) -> BlockedOld = is_ch_blocked(OldCR), @@ -630,7 +630,8 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, reply({error, exclusive_consume_unavailable}, State); ok -> C = #cr{consumers = Consumers} = ch_record(ChPid), - Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, + Consumer = #consumer{tag = ConsumerTag, + ack_required = not(NoAck)}, store_ch_record(C#cr{consumers = [Consumer | Consumers], limiter_pid = LimiterPid}), if Consumers == [] -> @@ -638,13 +639,14 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, true -> ok end, + ExclusiveConsumer = + if ExclusiveConsume -> {ChPid, ConsumerTag}; + true -> ExistingHolder + end, State1 = State#q{has_had_consumers = true, - exclusive_consumer = - if - ExclusiveConsume -> {ChPid, ConsumerTag}; - true -> ExistingHolder - end, - round_robin = queue:in({ChPid, Consumer}, RoundRobin)}, + exclusive_consumer = ExclusiveConsumer, + round_robin = queue:in({ChPid, Consumer}, + RoundRobin)}, ok = maybe_send_reply(ChPid, OkMsg), reply(ok, run_poke_burst(State1)) end |