diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-03-21 11:44:25 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-03-21 11:44:25 +0000 |
commit | 84e0ba7b3c20c64c0d3050421735c7046bc66834 (patch) | |
tree | 193b6ffe5b6c42a3a9c4bdb53d42db00f110d200 | |
parent | e6df03faac19d940ba3976fc9007aab0ef6502f5 (diff) | |
download | rabbitmq-server-84e0ba7b3c20c64c0d3050421735c7046bc66834.tar.gz |
rabbit_amqqueue:basic_consume() will wait for recovery anyway, we don't need a special function for that.
-rw-r--r-- | src/rabbit_amqqueue.erl | 13 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 17 |
2 files changed, 8 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4a7324a9..85d1f283 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -20,7 +20,7 @@ delete_immediately/1, delete/3, purge/1, forget_all_durable/1]). -export([pseudo_queue/2]). -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, - wait_for_recovery/2, assert_equivalence/5, + assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). @@ -90,9 +90,6 @@ -spec(with/3 :: (name(), qfun(A), fun((not_found_or_absent()) -> B)) -> A | B). -spec(with_or_die/2 :: (name(), qfun(A)) -> A | rabbit_types:channel_exit()). --spec(wait_for_recovery/2 :: - (pid(), name()) -> rabbit_types:ok(rabbit_types:amqqueue()) | - rabbit_types:error('not_found')). -spec(assert_equivalence/5 :: (rabbit_types:amqqueue(), boolean(), boolean(), rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) @@ -381,14 +378,6 @@ with_or_die(Name, F) -> ({absent, Q}) -> rabbit_misc:absent(Q) end). -wait_for_recovery(OldQPid, QName) -> - case rabbit_amqqueue:lookup(QName) of - {ok, #amqqueue{pid = OldQPid}} -> timer:sleep(25), - wait_for_recovery(OldQPid, QName); - {ok, Q} -> {ok, Q}; - {error, not_found} -> {error, not_found} - end. - assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q, Durable, AutoDelete, RequiredArgs, Owner) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 91dd9d26..1b536b60 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1255,15 +1255,15 @@ handle_consuming_queue_down(QPid, State = #ch{queue_consumers = QCons, gb_sets:fold( fun (CTag, StateN = #ch{consumer_mapping = CMap}) -> QName = dict:fetch(QPid, QNames), - case queue_down_consumer_action(QPid, QName, CTag, CMap) of + case queue_down_consumer_action(CTag, CMap) of remove -> cancel_consumer(CTag, QName, StateN); {recover, {NoAck, ConsumerPrefetch, Exclusive, Args}} -> - case basic_consume( - QName, NoAck, ConsumerPrefetch, CTag, - Exclusive, Args, true, StateN) of + case catch basic_consume( + QName, NoAck, ConsumerPrefetch, CTag, + Exclusive, Args, true, StateN) of {ok, StateN1} -> StateN1; - {error, _} -> cancel_consumer(CTag, QName, StateN) + _ -> cancel_consumer(CTag, QName, StateN) end end end, State#ch{queue_consumers = dict:erase(QPid, QCons)}, ConsumerTags). @@ -1276,13 +1276,10 @@ cancel_consumer(CTag, QName, State = #ch{consumer_mapping = CMap}) -> {queue, QName}]), State#ch{consumer_mapping = dict:erase(CTag, CMap)}. -queue_down_consumer_action(QPid, QName, CTag, CMap) -> +queue_down_consumer_action(CTag, CMap) -> {_, {_, _, _, Args} = ConsumeSpec} = dict:fetch(CTag, CMap), case rabbit_misc:table_lookup(Args, <<"recover-on-ha-failover">>) of - {bool, true} -> case rabbit_amqqueue:wait_for_recovery(QPid, QName) of - {ok, _Q} -> {recover, ConsumeSpec}; - {error, not_found} -> remove - end; + {bool, true} -> {recover, ConsumeSpec}; _ -> remove end. |