summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-02-18 17:42:56 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-02-18 17:42:56 +0000
commitd8c2900d40317202aa509ef18116c7058ddc7f16 (patch)
tree4b854f12da58c8c83d27b15f3d947f5d2bca113a
parent362a03d37666bdfa0c6e761c8ef92c861a5c848b (diff)
downloadrabbitmq-server-d8c2900d40317202aa509ef18116c7058ddc7f16.tar.gz
Make the consumer mapping store the queues, not the queuenames
-rw-r--r--src/rabbit_channel.erl47
1 files changed, 17 insertions, 30 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 2fc19256..ff8ff800 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -680,9 +680,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
- Q#amqqueue.pid}
+ Q}
end) of
- {ok, QPid} ->
+ {ok, Q = #amqqueue{pid = QPid}} ->
{ConsumerMonitors1, MRef} =
case rabbit_misc:table_lookup(
Capabilities,
@@ -696,10 +696,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
end,
{noreply, State#ch{consumer_mapping =
dict:store(ActualConsumerTag,
- {QueueName, MRef},
+ {Q, MRef},
ConsumerMapping),
consumer_monitors = ConsumerMonitors1}};
- {{error, exclusive_consume_unavailable}, _QPid} ->
+ {{error, exclusive_consume_unavailable}, _Q} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
[rabbit_misc:rs(QueueName)])
@@ -719,7 +719,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
error ->
%% Spec requires we ignore this situation.
return_ok(State, NoWait, OkMsg);
- {ok, {QueueName, MRef}} ->
+ {ok, {Q, MRef}} ->
ConsumerMonitors1 =
case MRef of
undefined -> ConsumerMonitors;
@@ -729,21 +729,15 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag,
ConsumerMapping),
consumer_monitors = ConsumerMonitors1},
- case rabbit_amqqueue:with(
- QueueName,
- fun (Q) ->
- %% In order to ensure that no more messages
- %% are sent to the consumer after the
- %% cancel_ok has been sent, we get the
- %% queue process to send the cancel_ok on
- %% our behalf. If we were sending the
- %% cancel_ok ourselves it might overtake a
- %% message sent previously by the queue.
- rabbit_amqqueue:basic_cancel(
- Q, self(), ConsumerTag,
- ok_msg(NoWait, #'basic.cancel_ok'{
- consumer_tag = ConsumerTag}))
- end) of
+ %% In order to ensure that no more messages are sent to
+ %% the consumer after the cancel_ok has been sent, we get
+ %% the queue process to send the cancel_ok on our
+ %% behalf. If we were sending the cancel_ok ourselves it
+ %% might overtake a message sent previously by the queue.
+ case rabbit_amqqueue:basic_cancel(
+ Q, self(), ConsumerTag,
+ ok_msg(NoWait, #'basic.cancel_ok'{
+ consumer_tag = ConsumerTag})) of
ok ->
{noreply, NewState};
{error, not_found} ->
@@ -1262,16 +1256,9 @@ limit_queues(LPid, #ch{consumer_mapping = Consumers}) ->
rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid).
consumer_queues(Consumers) ->
- [QPid || QueueName <-
- sets:to_list(
- dict:fold(fun (_ConsumerTag, {QueueName, _MRef}, S) ->
- sets:add_element(QueueName, S)
- end, sets:new(), Consumers)),
- case rabbit_amqqueue:lookup(QueueName) of
- {ok, Q} -> QPid = Q#amqqueue.pid, true;
- %% queue has been deleted in the meantime
- {error, not_found} -> QPid = none, false
- end].
+ lists:usort([QPid ||
+ {_Key, {#amqqueue{pid = QPid}, _MRef}}
+ <- dict:to_list(Consumers)]).
%% tell the limiter about the number of acks that have been received
%% for messages delivered to subscribed consumers, but not acks for