diff options
Diffstat (limited to 'src/rabbit_dead_letter.erl')
-rw-r--r-- | src/rabbit_dead_letter.erl | 141 |
1 files changed, 141 insertions, 0 deletions
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl new file mode 100644 index 00000000..b8a2cc9c --- /dev/null +++ b/src/rabbit_dead_letter.erl @@ -0,0 +1,141 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_dead_letter). + +-export([publish/5]). + +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec publish(rabbit_types:message(), atom(), rabbit_types:exchange(), + 'undefined' | binary(), rabbit_amqqueue:name()) -> 'ok'. + +-endif. + +%%---------------------------------------------------------------------------- + +publish(Msg, Reason, X, RK, QName) -> + DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName), + Delivery = rabbit_basic:delivery(false, false, DLMsg, undefined), + {Queues, Cycles} = detect_cycles(Reason, DLMsg, + rabbit_exchange:route(X, Delivery)), + lists:foreach(fun log_cycle_once/1, Cycles), + rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(Queues), Delivery), + ok. + +make_msg(Msg = #basic_message{content = Content, + exchange_name = Exchange, + routing_keys = RoutingKeys}, + Reason, DLX, RK, #resource{name = QName}) -> + {DeathRoutingKeys, HeadersFun1} = + case RK of + undefined -> {RoutingKeys, fun (H) -> H end}; + _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} + end, + ReasonBin = list_to_binary(atom_to_list(Reason)), + TimeSec = rabbit_misc:now_ms() div 1000, + PerMsgTTL = per_msg_ttl_header(Content#content.properties), + HeadersFun2 = + fun (Headers) -> + %% The first routing key is the one specified in the + %% basic.publish; all others are CC or BCC keys. + RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], + RKs1 = [{longstr, Key} || Key <- RKs], + Info = [{<<"reason">>, longstr, ReasonBin}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, TimeSec}, + {<<"exchange">>, longstr, Exchange#resource.name}, + {<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL, + HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>, + Info, Headers)) + end, + Content1 = #content{properties = Props} = + rabbit_basic:map_headers(HeadersFun2, Content), + Content2 = Content1#content{properties = + Props#'P_basic'{expiration = undefined}}, + Msg#basic_message{exchange_name = DLX, + id = rabbit_guid:gen(), + routing_keys = DeathRoutingKeys, + content = Content2}. + +per_msg_ttl_header(#'P_basic'{expiration = undefined}) -> + []; +per_msg_ttl_header(#'P_basic'{expiration = Expiration}) -> + [{<<"original-expiration">>, longstr, Expiration}]; +per_msg_ttl_header(_) -> + []. + +detect_cycles(expired, #basic_message{content = Content}, Queues) -> + #content{properties = #'P_basic'{headers = Headers}} = + rabbit_binary_parser:ensure_content_decoded(Content), + NoCycles = {Queues, []}, + case Headers of + undefined -> + NoCycles; + _ -> + case rabbit_misc:table_lookup(Headers, <<"x-death">>) of + {array, Deaths} -> + {Cycling, NotCycling} = + lists:partition(fun (#resource{name = Queue}) -> + is_cycle(Queue, Deaths) + end, Queues), + OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) || + {table, D} <- Deaths], + OldQueues1 = [QName || {longstr, QName} <- OldQueues], + {NotCycling, [[QName | OldQueues1] || + #resource{name = QName} <- Cycling]}; + _ -> + NoCycles + end + end; +detect_cycles(_Reason, _Msg, Queues) -> + {Queues, []}. + +is_cycle(Queue, Deaths) -> + {Cycle, Rest} = + lists:splitwith( + fun ({table, D}) -> + {longstr, Queue} =/= rabbit_misc:table_lookup(D, <<"queue">>); + (_) -> + true + end, Deaths), + %% Is there a cycle, and if so, is it entirely due to expiry? + case Rest of + [] -> false; + [H|_] -> lists:all( + fun ({table, D}) -> + {longstr, <<"expired">>} =:= + rabbit_misc:table_lookup(D, <<"reason">>); + (_) -> + false + end, Cycle ++ [H]) + end. + +log_cycle_once(Queues) -> + Key = {queue_cycle, Queues}, + case get(Key) of + true -> ok; + undefined -> rabbit_log:warning( + "Message dropped. Dead-letter queues cycle detected" ++ + ": ~p~nThis cycle will NOT be reported again.~n", + [Queues]), + put(Key, true) + end. |