summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-22 15:23:32 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-22 15:23:32 +0000
commit4b275612fc07d530bd3dea3f706317d0f558130c (patch)
tree298150db58ae3ed90d46bf8c3804f511ef64c729
parentede1b15456453855b7c211079033af5cf8db051f (diff)
parent016f3c221dbc23b0e9cf8dc7062cbe279d38fb05 (diff)
downloadrabbitmq-server-4b275612fc07d530bd3dea3f706317d0f558130c.tar.gz
merge default into bug24297
-rw-r--r--src/rabbit_queue_consumers.erl18
1 files changed, 9 insertions, 9 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index ca47b434..bf3d857d 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -141,24 +141,24 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
args = Args},
State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}.
-remove(ChPid, ConsumerTag, State = #state{consumers = Consumers}) ->
+remove(ChPid, CTag, State = #state{consumers = Consumers}) ->
case lookup_ch(ChPid) of
not_found ->
not_found;
C = #cr{consumer_count = Count,
limiter = Limiter,
blocked_consumers = Blocked} ->
- Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
+ Blocked1 = remove_consumer(ChPid, CTag, Blocked),
Limiter1 = case Count of
1 -> rabbit_limiter:deactivate(Limiter);
_ -> Limiter
end,
- Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag),
+ Limiter2 = rabbit_limiter:forget_consumer(Limiter1, CTag),
update_ch_record(C#cr{consumer_count = Count - 1,
limiter = Limiter2,
blocked_consumers = Blocked1}),
State#state{consumers =
- remove_consumer(ChPid, ConsumerTag, Consumers)}
+ remove_consumer(ChPid, CTag, Consumers)}
end.
erase_ch(ChPid, State = #state{consumers = Consumers}) ->
@@ -217,14 +217,14 @@ deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) ->
end.
deliver_to_consumer(FetchFun,
- #consumer{tag = ConsumerTag,
+ #consumer{tag = CTag,
ack_required = AckRequired},
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
unsent_message_count = Count},
QName) ->
{{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired),
- rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
+ rabbit_channel:deliver(ChPid, CTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
true -> queue:in(AckTag, ChAckTags);
@@ -423,9 +423,9 @@ add_consumer({ChPid, Consumer = #consumer{args = Args}}, Queue) ->
end,
priority_queue:in({ChPid, Consumer}, Priority, Queue).
-remove_consumer(ChPid, ConsumerTag, Queue) ->
- priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
- (CP /= ChPid) or (CTag /= ConsumerTag)
+remove_consumer(ChPid, CTag, Queue) ->
+ priority_queue:filter(fun ({CP, #consumer{tag = CT}}) ->
+ (CP /= ChPid) or (CT /= CTag)
end, Queue).
remove_consumers(ChPid, Queue) ->