summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-03-20 13:33:53 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-03-20 13:33:53 +0000
commitd6ab3821defa19f30551e34aba7360d0e80e1ecb (patch)
treef888535f749e9e2ac505db5f14d4137d7042a6bd
parent27eaa00d5ecec481133512a980a76557e69fd92d (diff)
downloadrabbitmq-server-d6ab3821defa19f30551e34aba7360d0e80e1ecb.tar.gz
Abstract the waiting into rabbit_amqqueue.
-rw-r--r--src/rabbit_amqqueue.erl13
-rw-r--r--src/rabbit_channel.erl15
2 files changed, 17 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 85d1f283..4a7324a9 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -20,7 +20,7 @@
delete_immediately/1, delete/3, purge/1, forget_all_durable/1]).
-export([pseudo_queue/2]).
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
- assert_equivalence/5,
+ wait_for_recovery/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
@@ -90,6 +90,9 @@
-spec(with/3 :: (name(), qfun(A), fun((not_found_or_absent()) -> B)) -> A | B).
-spec(with_or_die/2 ::
(name(), qfun(A)) -> A | rabbit_types:channel_exit()).
+-spec(wait_for_recovery/2 ::
+ (pid(), name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
+ rabbit_types:error('not_found')).
-spec(assert_equivalence/5 ::
(rabbit_types:amqqueue(), boolean(), boolean(),
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()))
@@ -378,6 +381,14 @@ with_or_die(Name, F) ->
({absent, Q}) -> rabbit_misc:absent(Q)
end).
+wait_for_recovery(OldQPid, QName) ->
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, #amqqueue{pid = OldQPid}} -> timer:sleep(25),
+ wait_for_recovery(OldQPid, QName);
+ {ok, Q} -> {ok, Q};
+ {error, not_found} -> {error, not_found}
+ end.
+
assert_equivalence(#amqqueue{durable = Durable,
auto_delete = AutoDelete} = Q,
Durable, AutoDelete, RequiredArgs, Owner) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3e800cf1..dac02b21 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1270,16 +1270,11 @@ handle_consuming_queue_down(QPid, State = #ch{queue_consumers = QCons,
queue_down_consumer_action(QPid, QName, CTag, CMap) ->
{_, {_, _, _, Args} = ConsumeSpec} = dict:fetch(CTag, CMap),
case rabbit_misc:table_lookup(Args, <<"recover-on-ha-failover">>) of
- {bool, true} ->
- case rabbit_amqqueue:lookup(QName) of
- {ok, #amqqueue{pid = QPid}} -> timer:sleep(25),
- queue_down_consumer_action(
- QPid, QName, CTag, CMap);
- {ok, _Q} -> {recover, ConsumeSpec};
- {error, not_found} -> remove
- end;
- _ ->
- remove
+ {bool, true} -> case rabbit_amqqueue:wait_for_recovery(QPid, QName) of
+ {ok, _Q} -> {recover, ConsumeSpec};
+ {error, not_found} -> remove
+ end;
+ _ -> remove
end.
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->