diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-03-20 13:33:53 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-03-20 13:33:53 +0000 |
commit | d6ab3821defa19f30551e34aba7360d0e80e1ecb (patch) | |
tree | f888535f749e9e2ac505db5f14d4137d7042a6bd | |
parent | 27eaa00d5ecec481133512a980a76557e69fd92d (diff) | |
download | rabbitmq-server-d6ab3821defa19f30551e34aba7360d0e80e1ecb.tar.gz |
Abstract the waiting into rabbit_amqqueue.
-rw-r--r-- | src/rabbit_amqqueue.erl | 13 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 15 |
2 files changed, 17 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 85d1f283..4a7324a9 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -20,7 +20,7 @@ delete_immediately/1, delete/3, purge/1, forget_all_durable/1]). -export([pseudo_queue/2]). -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, - assert_equivalence/5, + wait_for_recovery/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). @@ -90,6 +90,9 @@ -spec(with/3 :: (name(), qfun(A), fun((not_found_or_absent()) -> B)) -> A | B). -spec(with_or_die/2 :: (name(), qfun(A)) -> A | rabbit_types:channel_exit()). +-spec(wait_for_recovery/2 :: + (pid(), name()) -> rabbit_types:ok(rabbit_types:amqqueue()) | + rabbit_types:error('not_found')). -spec(assert_equivalence/5 :: (rabbit_types:amqqueue(), boolean(), boolean(), rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) @@ -378,6 +381,14 @@ with_or_die(Name, F) -> ({absent, Q}) -> rabbit_misc:absent(Q) end). +wait_for_recovery(OldQPid, QName) -> + case rabbit_amqqueue:lookup(QName) of + {ok, #amqqueue{pid = OldQPid}} -> timer:sleep(25), + wait_for_recovery(OldQPid, QName); + {ok, Q} -> {ok, Q}; + {error, not_found} -> {error, not_found} + end. + assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q, Durable, AutoDelete, RequiredArgs, Owner) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3e800cf1..dac02b21 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1270,16 +1270,11 @@ handle_consuming_queue_down(QPid, State = #ch{queue_consumers = QCons, queue_down_consumer_action(QPid, QName, CTag, CMap) -> {_, {_, _, _, Args} = ConsumeSpec} = dict:fetch(CTag, CMap), case rabbit_misc:table_lookup(Args, <<"recover-on-ha-failover">>) of - {bool, true} -> - case rabbit_amqqueue:lookup(QName) of - {ok, #amqqueue{pid = QPid}} -> timer:sleep(25), - queue_down_consumer_action( - QPid, QName, CTag, CMap); - {ok, _Q} -> {recover, ConsumeSpec}; - {error, not_found} -> remove - end; - _ -> - remove + {bool, true} -> case rabbit_amqqueue:wait_for_recovery(QPid, QName) of + {ok, _Q} -> {recover, ConsumeSpec}; + {error, not_found} -> remove + end; + _ -> remove end. handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> |