summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-03-21 11:44:25 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-03-21 11:44:25 +0000
commit84e0ba7b3c20c64c0d3050421735c7046bc66834 (patch)
tree193b6ffe5b6c42a3a9c4bdb53d42db00f110d200
parente6df03faac19d940ba3976fc9007aab0ef6502f5 (diff)
downloadrabbitmq-server-84e0ba7b3c20c64c0d3050421735c7046bc66834.tar.gz
rabbit_amqqueue:basic_consume() will wait for recovery anyway, we don't need a special function for that.
-rw-r--r--src/rabbit_amqqueue.erl13
-rw-r--r--src/rabbit_channel.erl17
2 files changed, 8 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4a7324a9..85d1f283 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -20,7 +20,7 @@
delete_immediately/1, delete/3, purge/1, forget_all_durable/1]).
-export([pseudo_queue/2]).
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
- wait_for_recovery/2, assert_equivalence/5,
+ assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
@@ -90,9 +90,6 @@
-spec(with/3 :: (name(), qfun(A), fun((not_found_or_absent()) -> B)) -> A | B).
-spec(with_or_die/2 ::
(name(), qfun(A)) -> A | rabbit_types:channel_exit()).
--spec(wait_for_recovery/2 ::
- (pid(), name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
- rabbit_types:error('not_found')).
-spec(assert_equivalence/5 ::
(rabbit_types:amqqueue(), boolean(), boolean(),
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()))
@@ -381,14 +378,6 @@ with_or_die(Name, F) ->
({absent, Q}) -> rabbit_misc:absent(Q)
end).
-wait_for_recovery(OldQPid, QName) ->
- case rabbit_amqqueue:lookup(QName) of
- {ok, #amqqueue{pid = OldQPid}} -> timer:sleep(25),
- wait_for_recovery(OldQPid, QName);
- {ok, Q} -> {ok, Q};
- {error, not_found} -> {error, not_found}
- end.
-
assert_equivalence(#amqqueue{durable = Durable,
auto_delete = AutoDelete} = Q,
Durable, AutoDelete, RequiredArgs, Owner) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 91dd9d26..1b536b60 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1255,15 +1255,15 @@ handle_consuming_queue_down(QPid, State = #ch{queue_consumers = QCons,
gb_sets:fold(
fun (CTag, StateN = #ch{consumer_mapping = CMap}) ->
QName = dict:fetch(QPid, QNames),
- case queue_down_consumer_action(QPid, QName, CTag, CMap) of
+ case queue_down_consumer_action(CTag, CMap) of
remove ->
cancel_consumer(CTag, QName, StateN);
{recover, {NoAck, ConsumerPrefetch, Exclusive, Args}} ->
- case basic_consume(
- QName, NoAck, ConsumerPrefetch, CTag,
- Exclusive, Args, true, StateN) of
+ case catch basic_consume(
+ QName, NoAck, ConsumerPrefetch, CTag,
+ Exclusive, Args, true, StateN) of
{ok, StateN1} -> StateN1;
- {error, _} -> cancel_consumer(CTag, QName, StateN)
+ _ -> cancel_consumer(CTag, QName, StateN)
end
end
end, State#ch{queue_consumers = dict:erase(QPid, QCons)}, ConsumerTags).
@@ -1276,13 +1276,10 @@ cancel_consumer(CTag, QName, State = #ch{consumer_mapping = CMap}) ->
{queue, QName}]),
State#ch{consumer_mapping = dict:erase(CTag, CMap)}.
-queue_down_consumer_action(QPid, QName, CTag, CMap) ->
+queue_down_consumer_action(CTag, CMap) ->
{_, {_, _, _, Args} = ConsumeSpec} = dict:fetch(CTag, CMap),
case rabbit_misc:table_lookup(Args, <<"recover-on-ha-failover">>) of
- {bool, true} -> case rabbit_amqqueue:wait_for_recovery(QPid, QName) of
- {ok, _Q} -> {recover, ConsumeSpec};
- {error, not_found} -> remove
- end;
+ {bool, true} -> {recover, ConsumeSpec};
_ -> remove
end.