diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-03-20 13:52:22 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-03-20 13:52:22 +0000 |
commit | e6df03faac19d940ba3976fc9007aab0ef6502f5 (patch) | |
tree | 8d40c4fcbe82393f57291b2830d554a9492b2485 | |
parent | d6ab3821defa19f30551e34aba7360d0e80e1ecb (diff) | |
download | rabbitmq-server-e6df03faac19d940ba3976fc9007aab0ef6502f5.tar.gz |
Attempt at error checking the basic.consume.
-rw-r--r-- | src/rabbit_channel.erl | 53 |
1 files changed, 31 insertions, 22 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dac02b21..91dd9d26 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -767,9 +767,16 @@ handle_method(#'basic.consume'{queue = QueueNameBin, "amq.ctag"); Other -> Other end, - {noreply, basic_consume( - QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, - ExclusiveConsume, Args, NoWait, State)}; + case basic_consume( + QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, + ExclusiveConsume, Args, NoWait, State) of + {ok, State1} -> + {noreply, State1}; + {error, exclusive_consume_unavailable} -> + rabbit_misc:protocol_error( + access_refused, "~s in exclusive use", + [rabbit_misc:rs(QueueName)]) + end; {ok, _} -> %% Attempted reuse of consumer tag. rabbit_misc:protocol_error( @@ -1185,14 +1192,12 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, State1 = monitor_delivering_queue( NoAck, QPid, QName, State#ch{consumer_mapping = CM1}), - case NoWait of - true -> consumer_monitor(ActualConsumerTag, State1); - false -> State1 - end; - {{error, exclusive_consume_unavailable}, _Q} -> - rabbit_misc:protocol_error( - access_refused, "~s in exclusive use", - [rabbit_misc:rs(QueueName)]) + {ok, case NoWait of + true -> consumer_monitor(ActualConsumerTag, State1); + false -> State1 + end}; + {{error, exclusive_consume_unavailable} = E, _Q} -> + E end. consumer_monitor(ConsumerTag, @@ -1252,21 +1257,25 @@ handle_consuming_queue_down(QPid, State = #ch{queue_consumers = QCons, QName = dict:fetch(QPid, QNames), case queue_down_consumer_action(QPid, QName, CTag, CMap) of remove -> - ok = send(#'basic.cancel'{consumer_tag = CTag, - nowait = true}, - StateN), - rabbit_event:notify( - consumer_deleted, [{consumer_tag, CTag}, - {channel, self()}, - {queue, QName}]), - StateN#ch{consumer_mapping = dict:erase(CTag, CMap)}; + cancel_consumer(CTag, QName, StateN); {recover, {NoAck, ConsumerPrefetch, Exclusive, Args}} -> - basic_consume( - QName, NoAck, ConsumerPrefetch, CTag, - Exclusive, Args, true, StateN) + case basic_consume( + QName, NoAck, ConsumerPrefetch, CTag, + Exclusive, Args, true, StateN) of + {ok, StateN1} -> StateN1; + {error, _} -> cancel_consumer(CTag, QName, StateN) + end end end, State#ch{queue_consumers = dict:erase(QPid, QCons)}, ConsumerTags). +cancel_consumer(CTag, QName, State = #ch{consumer_mapping = CMap}) -> + ok = send(#'basic.cancel'{consumer_tag = CTag, + nowait = true}, State), + rabbit_event:notify(consumer_deleted, [{consumer_tag, CTag}, + {channel, self()}, + {queue, QName}]), + State#ch{consumer_mapping = dict:erase(CTag, CMap)}. + queue_down_consumer_action(QPid, QName, CTag, CMap) -> {_, {_, _, _, Args} = ConsumeSpec} = dict:fetch(CTag, CMap), case rabbit_misc:table_lookup(Args, <<"recover-on-ha-failover">>) of |