summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-04-12 14:42:11 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-04-12 14:42:11 +0100
commitb382e07f8ef319dfbe46e4d874aa97e01233694c (patch)
tree96260cc41f53136f6478400d72309c9f0e8487f5
parent13d5ac8c71cea4835be5e6d42e66d5c6cf36b20f (diff)
downloadrabbitmq-server-b382e07f8ef319dfbe46e4d874aa97e01233694c.tar.gz
Only ban cycles that are entirely due to expiry
-rw-r--r--src/rabbit_amqqueue_process.erl37
1 files changed, 27 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b016c4d2..b0b37bb2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -846,7 +846,7 @@ dead_letter_publish(Msg, Reason, X, RK, MsgSeqNo, QName) ->
DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName),
Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo),
{Queues, Cycles} = detect_dead_letter_cycles(
- DLMsg, rabbit_exchange:route(X, Delivery)),
+ Reason, DLMsg, rabbit_exchange:route(X, Delivery)),
lists:foreach(fun log_cycle_once/1, Cycles),
{_, DeliveredQPids} = rabbit_amqqueue:deliver(
rabbit_amqqueue:lookup(Queues), Delivery),
@@ -895,7 +895,8 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
false -> noreply(State1)
end.
-detect_dead_letter_cycles(#basic_message{content = Content}, Queues) ->
+detect_dead_letter_cycles(expired,
+ #basic_message{content = Content}, Queues) ->
#content{properties = #'P_basic'{headers = Headers}} =
rabbit_binary_parser:ensure_content_decoded(Content),
NoCycles = {Queues, []},
@@ -904,22 +905,38 @@ detect_dead_letter_cycles(#basic_message{content = Content}, Queues) ->
NoCycles;
_ ->
case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
- {array, DeathTables} ->
- OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
- {table, D} <- DeathTables],
- OldQueues1 = [QName || {longstr, QName} <- OldQueues],
- OldQueuesSet = ordsets:from_list(OldQueues1),
+ {array, Deaths} ->
{Cycling, NotCycling} =
lists:partition(
- fun(Queue) ->
- ordsets:is_element(Queue#resource.name,
- OldQueuesSet)
+ fun (#resource{name = Queue}) ->
+ is_dead_letter_cycle(Queue, Deaths)
end, Queues),
+ OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
+ {table, D} <- Deaths],
+ OldQueues1 = [QName || {longstr, QName} <- OldQueues],
{NotCycling, [[QName | OldQueues1] ||
#resource{name = QName} <- Cycling]};
_ ->
NoCycles
end
+ end;
+detect_dead_letter_cycles(_Reason, _Msg, Queues) ->
+ {Queues, []}.
+
+is_dead_letter_cycle(Queue, Deaths) ->
+ {Cycle, Rest} =
+ lists:splitwith(
+ fun ({table, D}) ->
+ {longstr, Queue} =/= rabbit_misc:table_lookup(D, <<"queue">>);
+ (_) ->
+ true
+ end, lists:reverse(Deaths)),
+ %% Is there a cycle, and if so, is it entirely due to expiry?
+ case Rest of
+ [] -> false;
+ [H|_] -> [] =:= [D || {table, D} <- Cycle ++ [H],
+ {longstr, <<"expired">>} =/=
+ rabbit_misc:table_lookup(D, <<"reason">>)]
end.
make_dead_letter_msg(Msg = #basic_message{content = Content,