summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-03-20 13:52:22 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-03-20 13:52:22 +0000
commite6df03faac19d940ba3976fc9007aab0ef6502f5 (patch)
tree8d40c4fcbe82393f57291b2830d554a9492b2485
parentd6ab3821defa19f30551e34aba7360d0e80e1ecb (diff)
downloadrabbitmq-server-e6df03faac19d940ba3976fc9007aab0ef6502f5.tar.gz
Attempt at error checking the basic.consume.
-rw-r--r--src/rabbit_channel.erl53
1 files changed, 31 insertions, 22 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dac02b21..91dd9d26 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -767,9 +767,16 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
"amq.ctag");
Other -> Other
end,
- {noreply, basic_consume(
- QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
- ExclusiveConsume, Args, NoWait, State)};
+ case basic_consume(
+ QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
+ ExclusiveConsume, Args, NoWait, State) of
+ {ok, State1} ->
+ {noreply, State1};
+ {error, exclusive_consume_unavailable} ->
+ rabbit_misc:protocol_error(
+ access_refused, "~s in exclusive use",
+ [rabbit_misc:rs(QueueName)])
+ end;
{ok, _} ->
%% Attempted reuse of consumer tag.
rabbit_misc:protocol_error(
@@ -1185,14 +1192,12 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
State1 = monitor_delivering_queue(
NoAck, QPid, QName,
State#ch{consumer_mapping = CM1}),
- case NoWait of
- true -> consumer_monitor(ActualConsumerTag, State1);
- false -> State1
- end;
- {{error, exclusive_consume_unavailable}, _Q} ->
- rabbit_misc:protocol_error(
- access_refused, "~s in exclusive use",
- [rabbit_misc:rs(QueueName)])
+ {ok, case NoWait of
+ true -> consumer_monitor(ActualConsumerTag, State1);
+ false -> State1
+ end};
+ {{error, exclusive_consume_unavailable} = E, _Q} ->
+ E
end.
consumer_monitor(ConsumerTag,
@@ -1252,21 +1257,25 @@ handle_consuming_queue_down(QPid, State = #ch{queue_consumers = QCons,
QName = dict:fetch(QPid, QNames),
case queue_down_consumer_action(QPid, QName, CTag, CMap) of
remove ->
- ok = send(#'basic.cancel'{consumer_tag = CTag,
- nowait = true},
- StateN),
- rabbit_event:notify(
- consumer_deleted, [{consumer_tag, CTag},
- {channel, self()},
- {queue, QName}]),
- StateN#ch{consumer_mapping = dict:erase(CTag, CMap)};
+ cancel_consumer(CTag, QName, StateN);
{recover, {NoAck, ConsumerPrefetch, Exclusive, Args}} ->
- basic_consume(
- QName, NoAck, ConsumerPrefetch, CTag,
- Exclusive, Args, true, StateN)
+ case basic_consume(
+ QName, NoAck, ConsumerPrefetch, CTag,
+ Exclusive, Args, true, StateN) of
+ {ok, StateN1} -> StateN1;
+ {error, _} -> cancel_consumer(CTag, QName, StateN)
+ end
end
end, State#ch{queue_consumers = dict:erase(QPid, QCons)}, ConsumerTags).
+cancel_consumer(CTag, QName, State = #ch{consumer_mapping = CMap}) ->
+ ok = send(#'basic.cancel'{consumer_tag = CTag,
+ nowait = true}, State),
+ rabbit_event:notify(consumer_deleted, [{consumer_tag, CTag},
+ {channel, self()},
+ {queue, QName}]),
+ State#ch{consumer_mapping = dict:erase(CTag, CMap)}.
+
queue_down_consumer_action(QPid, QName, CTag, CMap) ->
{_, {_, _, _, Args} = ConsumeSpec} = dict:fetch(CTag, CMap),
case rabbit_misc:table_lookup(Args, <<"recover-on-ha-failover">>) of