diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-22 15:23:32 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-22 15:23:32 +0000 |
commit | 4b275612fc07d530bd3dea3f706317d0f558130c (patch) | |
tree | 298150db58ae3ed90d46bf8c3804f511ef64c729 | |
parent | ede1b15456453855b7c211079033af5cf8db051f (diff) | |
parent | 016f3c221dbc23b0e9cf8dc7062cbe279d38fb05 (diff) | |
download | rabbitmq-server-4b275612fc07d530bd3dea3f706317d0f558130c.tar.gz |
merge default into bug24297
-rw-r--r-- | src/rabbit_queue_consumers.erl | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index ca47b434..bf3d857d 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -141,24 +141,24 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, args = Args}, State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}. -remove(ChPid, ConsumerTag, State = #state{consumers = Consumers}) -> +remove(ChPid, CTag, State = #state{consumers = Consumers}) -> case lookup_ch(ChPid) of not_found -> not_found; C = #cr{consumer_count = Count, limiter = Limiter, blocked_consumers = Blocked} -> - Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), + Blocked1 = remove_consumer(ChPid, CTag, Blocked), Limiter1 = case Count of 1 -> rabbit_limiter:deactivate(Limiter); _ -> Limiter end, - Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag), + Limiter2 = rabbit_limiter:forget_consumer(Limiter1, CTag), update_ch_record(C#cr{consumer_count = Count - 1, limiter = Limiter2, blocked_consumers = Blocked1}), State#state{consumers = - remove_consumer(ChPid, ConsumerTag, Consumers)} + remove_consumer(ChPid, CTag, Consumers)} end. erase_ch(ChPid, State = #state{consumers = Consumers}) -> @@ -217,14 +217,14 @@ deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) -> end. deliver_to_consumer(FetchFun, - #consumer{tag = ConsumerTag, + #consumer{tag = CTag, ack_required = AckRequired}, C = #cr{ch_pid = ChPid, acktags = ChAckTags, unsent_message_count = Count}, QName) -> {{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired), - rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, + rabbit_channel:deliver(ChPid, CTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), ChAckTags1 = case AckRequired of true -> queue:in(AckTag, ChAckTags); @@ -423,9 +423,9 @@ add_consumer({ChPid, Consumer = #consumer{args = Args}}, Queue) -> end, priority_queue:in({ChPid, Consumer}, Priority, Queue). -remove_consumer(ChPid, ConsumerTag, Queue) -> - priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) -> - (CP /= ChPid) or (CTag /= ConsumerTag) +remove_consumer(ChPid, CTag, Queue) -> + priority_queue:filter(fun ({CP, #consumer{tag = CT}}) -> + (CP /= ChPid) or (CT /= CTag) end, Queue). remove_consumers(ChPid, Queue) -> |