diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-18 17:42:56 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-18 17:42:56 +0000 |
commit | d8c2900d40317202aa509ef18116c7058ddc7f16 (patch) | |
tree | 4b854f12da58c8c83d27b15f3d947f5d2bca113a | |
parent | 362a03d37666bdfa0c6e761c8ef92c861a5c848b (diff) | |
download | rabbitmq-server-d8c2900d40317202aa509ef18116c7058ddc7f16.tar.gz |
Make the consumer mapping store the queues, not the queuenames
-rw-r--r-- | src/rabbit_channel.erl | 47 |
1 files changed, 17 insertions, 30 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 2fc19256..ff8ff800 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -680,9 +680,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), - Q#amqqueue.pid} + Q} end) of - {ok, QPid} -> + {ok, Q = #amqqueue{pid = QPid}} -> {ConsumerMonitors1, MRef} = case rabbit_misc:table_lookup( Capabilities, @@ -696,10 +696,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin, end, {noreply, State#ch{consumer_mapping = dict:store(ActualConsumerTag, - {QueueName, MRef}, + {Q, MRef}, ConsumerMapping), consumer_monitors = ConsumerMonitors1}}; - {{error, exclusive_consume_unavailable}, _QPid} -> + {{error, exclusive_consume_unavailable}, _Q} -> rabbit_misc:protocol_error( access_refused, "~s in exclusive use", [rabbit_misc:rs(QueueName)]) @@ -719,7 +719,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, error -> %% Spec requires we ignore this situation. return_ok(State, NoWait, OkMsg); - {ok, {QueueName, MRef}} -> + {ok, {Q, MRef}} -> ConsumerMonitors1 = case MRef of undefined -> ConsumerMonitors; @@ -729,21 +729,15 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag, ConsumerMapping), consumer_monitors = ConsumerMonitors1}, - case rabbit_amqqueue:with( - QueueName, - fun (Q) -> - %% In order to ensure that no more messages - %% are sent to the consumer after the - %% cancel_ok has been sent, we get the - %% queue process to send the cancel_ok on - %% our behalf. If we were sending the - %% cancel_ok ourselves it might overtake a - %% message sent previously by the queue. - rabbit_amqqueue:basic_cancel( - Q, self(), ConsumerTag, - ok_msg(NoWait, #'basic.cancel_ok'{ - consumer_tag = ConsumerTag})) - end) of + %% In order to ensure that no more messages are sent to + %% the consumer after the cancel_ok has been sent, we get + %% the queue process to send the cancel_ok on our + %% behalf. If we were sending the cancel_ok ourselves it + %% might overtake a message sent previously by the queue. + case rabbit_amqqueue:basic_cancel( + Q, self(), ConsumerTag, + ok_msg(NoWait, #'basic.cancel_ok'{ + consumer_tag = ConsumerTag})) of ok -> {noreply, NewState}; {error, not_found} -> @@ -1262,16 +1256,9 @@ limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid). consumer_queues(Consumers) -> - [QPid || QueueName <- - sets:to_list( - dict:fold(fun (_ConsumerTag, {QueueName, _MRef}, S) -> - sets:add_element(QueueName, S) - end, sets:new(), Consumers)), - case rabbit_amqqueue:lookup(QueueName) of - {ok, Q} -> QPid = Q#amqqueue.pid, true; - %% queue has been deleted in the meantime - {error, not_found} -> QPid = none, false - end]. + lists:usort([QPid || + {_Key, {#amqqueue{pid = QPid}, _MRef}} + <- dict:to_list(Consumers)]). %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, but not acks for |