diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-03-25 15:14:27 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-03-25 15:14:27 +0000 |
commit | f2e1729521131549b6c2f0d12106f299b4a1901a (patch) | |
tree | ea32838903aa8a7af28693de7b4b65452135adb9 | |
parent | 333a2fc960a947ed50d7678acf20edc5762672e4 (diff) | |
parent | 6dfdefb70b21e870d13ee3d54614c91913fb726b (diff) | |
download | rabbitmq-server-f2e1729521131549b6c2f0d12106f299b4a1901a.tar.gz |
Merge bug26070
-rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 176 |
2 files changed, 108 insertions, 74 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 85d1f283..757f18ac 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -436,7 +436,8 @@ declare_args() -> {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, {<<"x-max-length">>, fun check_non_neg_int_arg/2}]. -consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}]. +consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, + {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}]. check_int_arg({Type, _}, _) -> case lists:member(Type, ?INTEGER_ARG_TYPES) of @@ -444,6 +445,9 @@ check_int_arg({Type, _}, _) -> false -> {error, {unacceptable_type, Type}} end. +check_bool_arg({bool, _}, _) -> ok; +check_bool_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}. + check_non_neg_int_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of ok when Val >= 0 -> ok; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 56a3cbb6..b9b39ac3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -755,9 +755,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, exclusive = ExclusiveConsume, nowait = NoWait, arguments = Args}, - _, State = #ch{conn_pid = ConnPid, - limiter = Limiter, - consumer_prefetch = ConsumerPrefetchCount, + _, State = #ch{consumer_prefetch = ConsumerPrefetch, consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -769,34 +767,12 @@ handle_method(#'basic.consume'{queue = QueueNameBin, "amq.ctag"); Other -> Other end, - - %% We get the queue process to send the consume_ok on our - %% behalf. This is for symmetry with basic.cancel - see - %% the comment in that method for why. - case rabbit_amqqueue:with_exclusive_access_or_die( - QueueName, ConnPid, - fun (Q) -> - {rabbit_amqqueue:basic_consume( - Q, NoAck, self(), - rabbit_limiter:pid(Limiter), - rabbit_limiter:is_active(Limiter), - ConsumerPrefetchCount, - ActualConsumerTag, ExclusiveConsume, Args, - ok_msg(NoWait, #'basic.consume_ok'{ - consumer_tag = ActualConsumerTag})), - Q} - end) of - {ok, Q = #amqqueue{pid = QPid, name = QName}} -> - CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping), - State1 = monitor_delivering_queue( - NoAck, QPid, QName, - State#ch{consumer_mapping = CM1}), - {noreply, - case NoWait of - true -> consumer_monitor(ActualConsumerTag, State1); - false -> State1 - end}; - {{error, exclusive_consume_unavailable}, _Q} -> + case basic_consume( + QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, + ExclusiveConsume, Args, NoWait, State) of + {ok, State1} -> + {noreply, State1}; + {error, exclusive_consume_unavailable} -> rabbit_misc:protocol_error( access_refused, "~s in exclusive use", [rabbit_misc:rs(QueueName)]) @@ -815,7 +791,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, error -> %% Spec requires we ignore this situation. return_ok(State, NoWait, OkMsg); - {ok, Q = #amqqueue{pid = QPid}} -> + {ok, {Q = #amqqueue{pid = QPid}, _CParams}} -> ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping), QCons1 = case dict:find(QPid, QCons) of @@ -1174,10 +1150,11 @@ handle_method(#'basic.credit'{consumer_tag = CTag, drain = Drain}, _, State = #ch{consumer_mapping = Consumers}) -> case dict:find(CTag, Consumers) of - {ok, Q} -> ok = rabbit_amqqueue:credit( - Q, self(), CTag, Credit, Drain), - {noreply, State}; - error -> precondition_failed("unknown consumer tag '~s'", [CTag]) + {ok, {Q, _CParams}} -> ok = rabbit_amqqueue:credit( + Q, self(), CTag, Credit, Drain), + {noreply, State}; + error -> precondition_failed( + "unknown consumer tag '~s'", [CTag]) end; handle_method(_MethodRecord, _Content, _State) -> @@ -1186,26 +1163,55 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- +%% We get the queue process to send the consume_ok on our behalf. This +%% is for symmetry with basic.cancel - see the comment in that method +%% for why. +basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, + ExclusiveConsume, Args, NoWait, + State = #ch{conn_pid = ConnPid, + limiter = Limiter, + consumer_mapping = ConsumerMapping}) -> + case rabbit_amqqueue:with_exclusive_access_or_die( + QueueName, ConnPid, + fun (Q) -> + {rabbit_amqqueue:basic_consume( + Q, NoAck, self(), + rabbit_limiter:pid(Limiter), + rabbit_limiter:is_active(Limiter), + ConsumerPrefetch, ActualConsumerTag, + ExclusiveConsume, Args, + ok_msg(NoWait, #'basic.consume_ok'{ + consumer_tag = ActualConsumerTag})), + Q} + end) of + {ok, Q = #amqqueue{pid = QPid, name = QName}} -> + CM1 = dict:store( + ActualConsumerTag, + {Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}}, + ConsumerMapping), + State1 = monitor_delivering_queue( + NoAck, QPid, QName, + State#ch{consumer_mapping = CM1}), + {ok, case NoWait of + true -> consumer_monitor(ActualConsumerTag, State1); + false -> State1 + end}; + {{error, exclusive_consume_unavailable} = E, _Q} -> + E + end. + consumer_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, queue_monitors = QMons, - queue_consumers = QCons, - capabilities = Capabilities}) -> - case rabbit_misc:table_lookup( - Capabilities, <<"consumer_cancel_notify">>) of - {bool, true} -> - #amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping), - QCons1 = dict:update(QPid, - fun (CTags) -> - gb_sets:insert(ConsumerTag, CTags) - end, - gb_sets:singleton(ConsumerTag), - QCons), - State#ch{queue_monitors = pmon:monitor(QPid, QMons), - queue_consumers = QCons1}; - _ -> - State - end. + queue_consumers = QCons}) -> + {#amqqueue{pid = QPid}, _CParams} = + dict:fetch(ConsumerTag, ConsumerMapping), + QCons1 = dict:update(QPid, fun (CTags) -> + gb_sets:insert(ConsumerTag, CTags) + end, + gb_sets:singleton(ConsumerTag), QCons), + State#ch{queue_monitors = pmon:monitor(QPid, QMons), + queue_consumers = QCons1}. monitor_delivering_queue(NoAck, QPid, QName, State = #ch{queue_names = QNames, @@ -1231,28 +1237,52 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC, end. -handle_consuming_queue_down(QPid, - State = #ch{consumer_mapping = ConsumerMapping, - queue_consumers = QCons, - queue_names = QNames}) -> +handle_consuming_queue_down(QPid, State = #ch{queue_consumers = QCons, + queue_names = QNames}) -> ConsumerTags = case dict:find(QPid, QCons) of error -> gb_sets:new(); {ok, CTags} -> CTags end, - ConsumerMapping1 = - gb_sets:fold(fun (CTag, CMap) -> - ok = send(#'basic.cancel'{consumer_tag = CTag, - nowait = true}, - State), - rabbit_event:notify( - consumer_deleted, - [{consumer_tag, CTag}, - {channel, self()}, - {queue, dict:fetch(QPid, QNames)}]), - dict:erase(CTag, CMap) - end, ConsumerMapping, ConsumerTags), - State#ch{consumer_mapping = ConsumerMapping1, - queue_consumers = dict:erase(QPid, QCons)}. + gb_sets:fold( + fun (CTag, StateN = #ch{consumer_mapping = CMap}) -> + QName = dict:fetch(QPid, QNames), + case queue_down_consumer_action(CTag, CMap) of + remove -> + cancel_consumer(CTag, QName, StateN); + {recover, {NoAck, ConsumerPrefetch, Exclusive, Args}} -> + case catch basic_consume( %% [0] + QName, NoAck, ConsumerPrefetch, CTag, + Exclusive, Args, true, StateN) of + {ok, StateN1} -> StateN1; + _ -> cancel_consumer(CTag, QName, StateN) + end + end + end, State#ch{queue_consumers = dict:erase(QPid, QCons)}, ConsumerTags). + +%% [0] There is a slight danger here that if a queue is deleted and +%% then recreated again the reconsume will succeed even though it was +%% not an HA failover. But the likelihood is not great and most users +%% are unlikely to care. + +cancel_consumer(CTag, QName, State = #ch{capabilities = Capabilities, + consumer_mapping = CMap}) -> + case rabbit_misc:table_lookup( + Capabilities, <<"consumer_cancel_notify">>) of + {bool, true} -> ok = send(#'basic.cancel'{consumer_tag = CTag, + nowait = true}, State); + _ -> ok + end, + rabbit_event:notify(consumer_deleted, [{consumer_tag, CTag}, + {channel, self()}, + {queue, QName}]), + State#ch{consumer_mapping = dict:erase(CTag, CMap)}. + +queue_down_consumer_action(CTag, CMap) -> + {_, {_, _, _, Args} = ConsumeSpec} = dict:fetch(CTag, CMap), + case rabbit_misc:table_lookup(Args, <<"x-cancel-on-ha-failover">>) of + {bool, true} -> remove; + _ -> {recover, ConsumeSpec} + end. handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QPid, DQ)}. @@ -1427,8 +1457,8 @@ foreach_per_queue(F, UAL) -> rabbit_misc:gb_trees_foreach(F, T). consumer_queues(Consumers) -> - lists:usort([QPid || - {_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]). + lists:usort([QPid || {_Key, {#amqqueue{pid = QPid}, _CParams}} + <- dict:to_list(Consumers)]). %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, but not acks for |