summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-02-21 17:36:09 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-02-21 17:36:09 +0000
commitbe8cb807748f28021d38c62f158f095874d9d607 (patch)
treeadaf53d6d53717768619451dc3df0f39c4f9c010
parentea2fcaea17c27e6d186d1244b9ad918c83dabe92 (diff)
downloadrabbitmq-server-be8cb807748f28021d38c62f158f095874d9d607.tar.gz
renames and refactors
-rw-r--r--src/rabbit_channel.erl24
-rw-r--r--src/rabbit_reader.erl8
2 files changed, 15 insertions, 17 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index abda1c1f..28f3673d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -299,7 +299,7 @@ handle_info({'DOWN', MRef, process, QPid, Reason},
noreply(
case dict:find(MRef, ConsumerMonitors) of
error ->
- handle_non_consumer_down(QPid, Reason, State);
+ handle_queue_down(QPid, Reason, State);
{ok, ConsumerTag} ->
handle_consumer_down(MRef, ConsumerTag, State)
end).
@@ -717,7 +717,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
{Q, undefined},
ConsumerMapping)},
{noreply,
- maybe_monitor_consumer(NoWait, ActualConsumerTag, State1)};
+ case NoWait of
+ true -> monitor_consumer(ActualConsumerTag, State1);
+ false -> State1
+ end};
{{error, exclusive_consume_unavailable}, _Q} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
@@ -1085,29 +1088,24 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
-maybe_monitor_consumer(true, ConsumerTag, State) ->
- monitor_consumer(ConsumerTag, State);
-maybe_monitor_consumer(false, _ConsumerTag, State) ->
- State.
-
monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping,
consumer_monitors = ConsumerMonitors,
capabilities = Capabilities}) ->
- case {dict:find(ConsumerTag, ConsumerMapping),
- rabbit_misc:table_lookup(
- Capabilities, <<"consumer_death_notification">>)} of
- {{ok, {#amqqueue{pid = QPid} = Q, undefined}}, {bool, true}} ->
+ {#amqqueue{pid = QPid} = Q, undefined} = dict:fetch(ConsumerTag,
+ ConsumerMapping),
+ case rabbit_misc:table_lookup(
+ Capabilities, <<"consumer_cancel_notify">>) of
+ {bool, true} ->
MRef = erlang:monitor(process, QPid),
State#ch{consumer_mapping =
dict:store(ConsumerTag, {Q, MRef}, ConsumerMapping),
consumer_monitors =
dict:store(MRef, ConsumerTag, ConsumerMonitors)};
_ ->
- %% either already received the cancel or incapable client
State
end.
-handle_non_consumer_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
+handle_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
{value, MsgSet} -> gb_sets:to_list(MsgSet);
none -> []
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index c5d6ecc4..aa7d2775 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -163,10 +163,10 @@ server_properties(Protocol) ->
NormalizedConfigServerProps).
server_capabilities(rabbit_framing_amqp_0_9_1) ->
- [{<<"publisher_confirms">>, bool, true},
- {<<"exchange_exchange_bindings">>, bool, true},
- {<<"basic.nack">>, bool, true},
- {<<"consumer_death_notification">>, bool, true}];
+ [{<<"publisher_confirms">>, bool, true},
+ {<<"exchange_exchange_bindings">>, bool, true},
+ {<<"basic.nack">>, bool, true},
+ {<<"consumer_cancel_notify">>, bool, true}];
server_capabilities(_) ->
[].