diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-04-12 14:42:11 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-04-12 14:42:11 +0100 |
commit | b382e07f8ef319dfbe46e4d874aa97e01233694c (patch) | |
tree | 96260cc41f53136f6478400d72309c9f0e8487f5 | |
parent | 13d5ac8c71cea4835be5e6d42e66d5c6cf36b20f (diff) | |
download | rabbitmq-server-b382e07f8ef319dfbe46e4d874aa97e01233694c.tar.gz |
Only ban cycles that are entirely due to expiry
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 37 |
1 files changed, 27 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b016c4d2..b0b37bb2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -846,7 +846,7 @@ dead_letter_publish(Msg, Reason, X, RK, MsgSeqNo, QName) -> DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName), Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo), {Queues, Cycles} = detect_dead_letter_cycles( - DLMsg, rabbit_exchange:route(X, Delivery)), + Reason, DLMsg, rabbit_exchange:route(X, Delivery)), lists:foreach(fun log_cycle_once/1, Cycles), {_, DeliveredQPids} = rabbit_amqqueue:deliver( rabbit_amqqueue:lookup(Queues), Delivery), @@ -895,7 +895,8 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, false -> noreply(State1) end. -detect_dead_letter_cycles(#basic_message{content = Content}, Queues) -> +detect_dead_letter_cycles(expired, + #basic_message{content = Content}, Queues) -> #content{properties = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), NoCycles = {Queues, []}, @@ -904,22 +905,38 @@ detect_dead_letter_cycles(#basic_message{content = Content}, Queues) -> NoCycles; _ -> case rabbit_misc:table_lookup(Headers, <<"x-death">>) of - {array, DeathTables} -> - OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) || - {table, D} <- DeathTables], - OldQueues1 = [QName || {longstr, QName} <- OldQueues], - OldQueuesSet = ordsets:from_list(OldQueues1), + {array, Deaths} -> {Cycling, NotCycling} = lists:partition( - fun(Queue) -> - ordsets:is_element(Queue#resource.name, - OldQueuesSet) + fun (#resource{name = Queue}) -> + is_dead_letter_cycle(Queue, Deaths) end, Queues), + OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) || + {table, D} <- Deaths], + OldQueues1 = [QName || {longstr, QName} <- OldQueues], {NotCycling, [[QName | OldQueues1] || #resource{name = QName} <- Cycling]}; _ -> NoCycles end + end; +detect_dead_letter_cycles(_Reason, _Msg, Queues) -> + {Queues, []}. + +is_dead_letter_cycle(Queue, Deaths) -> + {Cycle, Rest} = + lists:splitwith( + fun ({table, D}) -> + {longstr, Queue} =/= rabbit_misc:table_lookup(D, <<"queue">>); + (_) -> + true + end, lists:reverse(Deaths)), + %% Is there a cycle, and if so, is it entirely due to expiry? + case Rest of + [] -> false; + [H|_] -> [] =:= [D || {table, D} <- Cycle ++ [H], + {longstr, <<"expired">>} =/= + rabbit_misc:table_lookup(D, <<"reason">>)] end. make_dead_letter_msg(Msg = #basic_message{content = Content, |