diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-03-19 11:28:04 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-03-19 11:28:04 +0000 |
commit | af213cb722371d6d6ed38bbe095753aa32d49ca5 (patch) | |
tree | 998cb2a844d9e8f9705cc84ac1be2b9caa712e19 | |
parent | 50e549693f5fc26177b5e89944a860e5e624772f (diff) | |
download | rabbitmq-server-af213cb722371d6d6ed38bbe095753aa32d49ca5.tar.gz |
First hack at recovering consumers.
-rw-r--r-- | src/rabbit_channel.erl | 148 |
1 files changed, 88 insertions, 60 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 56a3cbb6..600488cd 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -755,9 +755,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, exclusive = ExclusiveConsume, nowait = NoWait, arguments = Args}, - _, State = #ch{conn_pid = ConnPid, - limiter = Limiter, - consumer_prefetch = ConsumerPrefetchCount, + _, State = #ch{consumer_prefetch = ConsumerPrefetch, consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -769,38 +767,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin, "amq.ctag"); Other -> Other end, - - %% We get the queue process to send the consume_ok on our - %% behalf. This is for symmetry with basic.cancel - see - %% the comment in that method for why. - case rabbit_amqqueue:with_exclusive_access_or_die( - QueueName, ConnPid, - fun (Q) -> - {rabbit_amqqueue:basic_consume( - Q, NoAck, self(), - rabbit_limiter:pid(Limiter), - rabbit_limiter:is_active(Limiter), - ConsumerPrefetchCount, - ActualConsumerTag, ExclusiveConsume, Args, - ok_msg(NoWait, #'basic.consume_ok'{ - consumer_tag = ActualConsumerTag})), - Q} - end) of - {ok, Q = #amqqueue{pid = QPid, name = QName}} -> - CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping), - State1 = monitor_delivering_queue( - NoAck, QPid, QName, - State#ch{consumer_mapping = CM1}), - {noreply, - case NoWait of - true -> consumer_monitor(ActualConsumerTag, State1); - false -> State1 - end}; - {{error, exclusive_consume_unavailable}, _Q} -> - rabbit_misc:protocol_error( - access_refused, "~s in exclusive use", - [rabbit_misc:rs(QueueName)]) - end; + {noreply, basic_consume( + QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, + ExclusiveConsume, Args, NoWait, State)}; {ok, _} -> %% Attempted reuse of consumer tag. rabbit_misc:protocol_error( @@ -1174,10 +1143,11 @@ handle_method(#'basic.credit'{consumer_tag = CTag, drain = Drain}, _, State = #ch{consumer_mapping = Consumers}) -> case dict:find(CTag, Consumers) of - {ok, Q} -> ok = rabbit_amqqueue:credit( - Q, self(), CTag, Credit, Drain), - {noreply, State}; - error -> precondition_failed("unknown consumer tag '~s'", [CTag]) + {ok, {Q, _Args}} -> ok = rabbit_amqqueue:credit( + Q, self(), CTag, Credit, Drain), + {noreply, State}; + error -> precondition_failed( + "unknown consumer tag '~s'", [CTag]) end; handle_method(_MethodRecord, _Content, _State) -> @@ -1186,6 +1156,45 @@ handle_method(_MethodRecord, _Content, _State) -> %%---------------------------------------------------------------------------- +%% We get the queue process to send the consume_ok on our behalf. This +%% is for symmetry with basic.cancel - see the comment in that method +%% for why. +basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, + ExclusiveConsume, Args, NoWait, + State = #ch{conn_pid = ConnPid, + limiter = Limiter, + consumer_mapping = ConsumerMapping}) -> + case rabbit_amqqueue:with_exclusive_access_or_die( + QueueName, ConnPid, + fun (Q) -> + {rabbit_amqqueue:basic_consume( + Q, NoAck, self(), + rabbit_limiter:pid(Limiter), + rabbit_limiter:is_active(Limiter), + ConsumerPrefetch, ActualConsumerTag, + ExclusiveConsume, Args, + ok_msg(NoWait, #'basic.consume_ok'{ + consumer_tag = ActualConsumerTag})), + Q} + end) of + {ok, Q = #amqqueue{pid = QPid, name = QName}} -> + CM1 = dict:store( + ActualConsumerTag, + {Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}}, + ConsumerMapping), + State1 = monitor_delivering_queue( + NoAck, QPid, QName, + State#ch{consumer_mapping = CM1}), + case NoWait of + true -> consumer_monitor(ActualConsumerTag, State1); + false -> State1 + end; + {{error, exclusive_consume_unavailable}, _Q} -> + rabbit_misc:protocol_error( + access_refused, "~s in exclusive use", + [rabbit_misc:rs(QueueName)]) + end. + consumer_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, queue_monitors = QMons, @@ -1194,7 +1203,8 @@ consumer_monitor(ConsumerTag, case rabbit_misc:table_lookup( Capabilities, <<"consumer_cancel_notify">>) of {bool, true} -> - #amqqueue{pid = QPid} = dict:fetch(ConsumerTag, ConsumerMapping), + {#amqqueue{pid = QPid}, _Args} = + dict:fetch(ConsumerTag, ConsumerMapping), QCons1 = dict:update(QPid, fun (CTags) -> gb_sets:insert(ConsumerTag, CTags) @@ -1231,28 +1241,46 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC, end. -handle_consuming_queue_down(QPid, - State = #ch{consumer_mapping = ConsumerMapping, - queue_consumers = QCons, - queue_names = QNames}) -> +handle_consuming_queue_down(QPid, State = #ch{queue_consumers = QCons, + queue_names = QNames}) -> ConsumerTags = case dict:find(QPid, QCons) of error -> gb_sets:new(); {ok, CTags} -> CTags end, - ConsumerMapping1 = - gb_sets:fold(fun (CTag, CMap) -> - ok = send(#'basic.cancel'{consumer_tag = CTag, - nowait = true}, - State), - rabbit_event:notify( - consumer_deleted, - [{consumer_tag, CTag}, - {channel, self()}, - {queue, dict:fetch(QPid, QNames)}]), - dict:erase(CTag, CMap) - end, ConsumerMapping, ConsumerTags), - State#ch{consumer_mapping = ConsumerMapping1, - queue_consumers = dict:erase(QPid, QCons)}. + gb_sets:fold( + fun (CTag, StateN = #ch{consumer_mapping = CMap}) -> + QName = dict:fetch(QPid, QNames), + case queue_down_consumer_action(QPid, QName, CTag, CMap) of + remove -> + ok = send(#'basic.cancel'{consumer_tag = CTag, + nowait = true}, + State), + rabbit_event:notify( + consumer_deleted, [{consumer_tag, CTag}, + {channel, self()}, + {queue, QName}]), + StateN#ch{consumer_mapping = dict:erase(CTag, CMap)}; + {recover, {NoAck, ConsumerPrefetch, Exclusive, Args}} -> + basic_consume( + QName, NoAck, ConsumerPrefetch, CTag, + Exclusive, Args, true, StateN) + end + end, State#ch{queue_consumers = dict:erase(QPid, QCons)}, ConsumerTags). + +queue_down_consumer_action(QPid, QName, CTag, CMap) -> + {_, {_, _, _, Args} = ConsumeSpec} = dict:fetch(CTag, CMap), + case rabbit_misc:table_lookup(Args, <<"recover-on-ha-failover">>) of + {bool, true} -> + case rabbit_amqqueue:lookup(QName) of + {ok, #amqqueue{pid = QPid}} -> timer:sleep(25), + queue_down_consumer_action( + QPid, QName, CTag, CMap); + {ok, _Q} -> {recover, ConsumeSpec}; + {error, not_found} -> remove + end; + _ -> + remove + end. handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QPid, DQ)}. @@ -1427,8 +1455,8 @@ foreach_per_queue(F, UAL) -> rabbit_misc:gb_trees_foreach(F, T). consumer_queues(Consumers) -> - lists:usort([QPid || - {_Key, #amqqueue{pid = QPid}} <- dict:to_list(Consumers)]). + lists:usort([QPid || {_Key, {#amqqueue{pid = QPid}, _Args}} + <- dict:to_list(Consumers)]). %% tell the limiter about the number of acks that have been received %% for messages delivered to subscribed consumers, but not acks for |