diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-21 17:36:09 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-02-21 17:36:09 +0000 |
commit | be8cb807748f28021d38c62f158f095874d9d607 (patch) | |
tree | adaf53d6d53717768619451dc3df0f39c4f9c010 | |
parent | ea2fcaea17c27e6d186d1244b9ad918c83dabe92 (diff) | |
download | rabbitmq-server-be8cb807748f28021d38c62f158f095874d9d607.tar.gz |
renames and refactors
-rw-r--r-- | src/rabbit_channel.erl | 24 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 8 |
2 files changed, 15 insertions, 17 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index abda1c1f..28f3673d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -299,7 +299,7 @@ handle_info({'DOWN', MRef, process, QPid, Reason}, noreply( case dict:find(MRef, ConsumerMonitors) of error -> - handle_non_consumer_down(QPid, Reason, State); + handle_queue_down(QPid, Reason, State); {ok, ConsumerTag} -> handle_consumer_down(MRef, ConsumerTag, State) end). @@ -717,7 +717,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin, {Q, undefined}, ConsumerMapping)}, {noreply, - maybe_monitor_consumer(NoWait, ActualConsumerTag, State1)}; + case NoWait of + true -> monitor_consumer(ActualConsumerTag, State1); + false -> State1 + end}; {{error, exclusive_consume_unavailable}, _Q} -> rabbit_misc:protocol_error( access_refused, "~s in exclusive use", @@ -1085,29 +1088,24 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- -maybe_monitor_consumer(true, ConsumerTag, State) -> - monitor_consumer(ConsumerTag, State); -maybe_monitor_consumer(false, _ConsumerTag, State) -> - State. - monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, consumer_monitors = ConsumerMonitors, capabilities = Capabilities}) -> - case {dict:find(ConsumerTag, ConsumerMapping), - rabbit_misc:table_lookup( - Capabilities, <<"consumer_death_notification">>)} of - {{ok, {#amqqueue{pid = QPid} = Q, undefined}}, {bool, true}} -> + {#amqqueue{pid = QPid} = Q, undefined} = dict:fetch(ConsumerTag, + ConsumerMapping), + case rabbit_misc:table_lookup( + Capabilities, <<"consumer_cancel_notify">>) of + {bool, true} -> MRef = erlang:monitor(process, QPid), State#ch{consumer_mapping = dict:store(ConsumerTag, {Q, MRef}, ConsumerMapping), consumer_monitors = dict:store(MRef, ConsumerTag, ConsumerMonitors)}; _ -> - %% either already received the cancel or incapable client State end. -handle_non_consumer_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> +handle_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> MsgSeqNos = case gb_trees:lookup(QPid, UQM) of {value, MsgSet} -> gb_sets:to_list(MsgSet); none -> [] diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index c5d6ecc4..aa7d2775 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -163,10 +163,10 @@ server_properties(Protocol) -> NormalizedConfigServerProps). server_capabilities(rabbit_framing_amqp_0_9_1) -> - [{<<"publisher_confirms">>, bool, true}, - {<<"exchange_exchange_bindings">>, bool, true}, - {<<"basic.nack">>, bool, true}, - {<<"consumer_death_notification">>, bool, true}]; + [{<<"publisher_confirms">>, bool, true}, + {<<"exchange_exchange_bindings">>, bool, true}, + {<<"basic.nack">>, bool, true}, + {<<"consumer_cancel_notify">>, bool, true}]; server_capabilities(_) -> []. |