diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-08 17:09:52 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-08 17:09:52 +0000 |
commit | f0ceca7de46e4d70ec74020fff48dded3096c09e (patch) | |
tree | 5670a4f3b8affb262d2dd76c7d5f28d9a0d93e1c | |
parent | c56c0e6d42a9949606aa5661d69be4beed32c3e2 (diff) | |
download | rabbitmq-server-bug25951.tar.gz |
extract dead-letter publishing from rabbit_amqqueue_processbug25951
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 113 | ||||
-rw-r--r-- | src/rabbit_dead_letter.erl | 141 |
2 files changed, 142 insertions, 112 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6391ebe6..bd258ecf 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -717,117 +717,17 @@ dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, QName = qname(State), {Res, Acks1, BQS1} = Fun(fun (Msg, AckTag, Acks) -> - dead_letter_publish(Msg, Reason, X, RK, QName), + rabbit_dead_letter:publish(Msg, Reason, X, RK, QName), [AckTag | Acks] end, [], BQS), {_Guids, BQS2} = BQ:ack(Acks1, BQS1), {Res, State#q{backing_queue_state = BQS2}}. -dead_letter_publish(Msg, Reason, X, RK, QName) -> - DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName), - Delivery = rabbit_basic:delivery(false, DLMsg, undefined), - {Queues, Cycles} = detect_dead_letter_cycles( - Reason, DLMsg, rabbit_exchange:route(X, Delivery)), - lists:foreach(fun log_cycle_once/1, Cycles), - rabbit_amqqueue:deliver( rabbit_amqqueue:lookup(Queues), Delivery), - ok. - stop(State) -> stop(noreply, State). stop(noreply, State) -> {stop, normal, State}; stop(Reply, State) -> {stop, normal, Reply, State}. - -detect_dead_letter_cycles(expired, - #basic_message{content = Content}, Queues) -> - #content{properties = #'P_basic'{headers = Headers}} = - rabbit_binary_parser:ensure_content_decoded(Content), - NoCycles = {Queues, []}, - case Headers of - undefined -> - NoCycles; - _ -> - case rabbit_misc:table_lookup(Headers, <<"x-death">>) of - {array, Deaths} -> - {Cycling, NotCycling} = - lists:partition( - fun (#resource{name = Queue}) -> - is_dead_letter_cycle(Queue, Deaths) - end, Queues), - OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) || - {table, D} <- Deaths], - OldQueues1 = [QName || {longstr, QName} <- OldQueues], - {NotCycling, [[QName | OldQueues1] || - #resource{name = QName} <- Cycling]}; - _ -> - NoCycles - end - end; -detect_dead_letter_cycles(_Reason, _Msg, Queues) -> - {Queues, []}. - -is_dead_letter_cycle(Queue, Deaths) -> - {Cycle, Rest} = - lists:splitwith( - fun ({table, D}) -> - {longstr, Queue} =/= rabbit_misc:table_lookup(D, <<"queue">>); - (_) -> - true - end, Deaths), - %% Is there a cycle, and if so, is it entirely due to expiry? - case Rest of - [] -> false; - [H|_] -> lists:all( - fun ({table, D}) -> - {longstr, <<"expired">>} =:= - rabbit_misc:table_lookup(D, <<"reason">>); - (_) -> - false - end, Cycle ++ [H]) - end. - -make_dead_letter_msg(Msg = #basic_message{content = Content, - exchange_name = Exchange, - routing_keys = RoutingKeys}, - Reason, DLX, RK, #resource{name = QName}) -> - {DeathRoutingKeys, HeadersFun1} = - case RK of - undefined -> {RoutingKeys, fun (H) -> H end}; - _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} - end, - ReasonBin = list_to_binary(atom_to_list(Reason)), - TimeSec = rabbit_misc:now_ms() div 1000, - PerMsgTTL = per_msg_ttl_header(Content#content.properties), - HeadersFun2 = - fun (Headers) -> - %% The first routing key is the one specified in the - %% basic.publish; all others are CC or BCC keys. - RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], - RKs1 = [{longstr, Key} || Key <- RKs], - Info = [{<<"reason">>, longstr, ReasonBin}, - {<<"queue">>, longstr, QName}, - {<<"time">>, timestamp, TimeSec}, - {<<"exchange">>, longstr, Exchange#resource.name}, - {<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL, - HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>, - Info, Headers)) - end, - Content1 = #content{properties = Props} = - rabbit_basic:map_headers(HeadersFun2, Content), - Content2 = Content1#content{properties = - Props#'P_basic'{expiration = undefined}}, - Msg#basic_message{exchange_name = DLX, - id = rabbit_guid:gen(), - routing_keys = DeathRoutingKeys, - content = Content2}. - -per_msg_ttl_header(#'P_basic'{expiration = undefined}) -> - []; -per_msg_ttl_header(#'P_basic'{expiration = Expiration}) -> - [{<<"original-expiration">>, longstr, Expiration}]; -per_msg_ttl_header(_) -> - []. - now_micros() -> timer:now_diff(now(), {0,0,0}). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -1319,14 +1219,3 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, {hibernate, stop_rate_timer(State1)}. format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). - -log_cycle_once(Queues) -> - Key = {queue_cycle, Queues}, - case get(Key) of - true -> ok; - undefined -> rabbit_log:warning( - "Message dropped. Dead-letter queues cycle detected" ++ - ": ~p~nThis cycle will NOT be reported again.~n", - [Queues]), - put(Key, true) - end. diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl new file mode 100644 index 00000000..640b282e --- /dev/null +++ b/src/rabbit_dead_letter.erl @@ -0,0 +1,141 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. +%% + +-module(rabbit_dead_letter). + +-export([publish/5]). + +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec publish(rabbit_types:message(), atom(), rabbit_types:exchange(), + 'undefined' | binary(), rabbit_amqqueue:name()) -> 'ok'. + +-endif. + +%%---------------------------------------------------------------------------- + +publish(Msg, Reason, X, RK, QName) -> + DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName), + Delivery = rabbit_basic:delivery(false, DLMsg, undefined), + {Queues, Cycles} = detect_cycles(Reason, DLMsg, + rabbit_exchange:route(X, Delivery)), + lists:foreach(fun log_cycle_once/1, Cycles), + rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(Queues), Delivery), + ok. + +make_msg(Msg = #basic_message{content = Content, + exchange_name = Exchange, + routing_keys = RoutingKeys}, + Reason, DLX, RK, #resource{name = QName}) -> + {DeathRoutingKeys, HeadersFun1} = + case RK of + undefined -> {RoutingKeys, fun (H) -> H end}; + _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} + end, + ReasonBin = list_to_binary(atom_to_list(Reason)), + TimeSec = rabbit_misc:now_ms() div 1000, + PerMsgTTL = per_msg_ttl_header(Content#content.properties), + HeadersFun2 = + fun (Headers) -> + %% The first routing key is the one specified in the + %% basic.publish; all others are CC or BCC keys. + RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], + RKs1 = [{longstr, Key} || Key <- RKs], + Info = [{<<"reason">>, longstr, ReasonBin}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, TimeSec}, + {<<"exchange">>, longstr, Exchange#resource.name}, + {<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL, + HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>, + Info, Headers)) + end, + Content1 = #content{properties = Props} = + rabbit_basic:map_headers(HeadersFun2, Content), + Content2 = Content1#content{properties = + Props#'P_basic'{expiration = undefined}}, + Msg#basic_message{exchange_name = DLX, + id = rabbit_guid:gen(), + routing_keys = DeathRoutingKeys, + content = Content2}. + +per_msg_ttl_header(#'P_basic'{expiration = undefined}) -> + []; +per_msg_ttl_header(#'P_basic'{expiration = Expiration}) -> + [{<<"original-expiration">>, longstr, Expiration}]; +per_msg_ttl_header(_) -> + []. + +detect_cycles(expired, #basic_message{content = Content}, Queues) -> + #content{properties = #'P_basic'{headers = Headers}} = + rabbit_binary_parser:ensure_content_decoded(Content), + NoCycles = {Queues, []}, + case Headers of + undefined -> + NoCycles; + _ -> + case rabbit_misc:table_lookup(Headers, <<"x-death">>) of + {array, Deaths} -> + {Cycling, NotCycling} = + lists:partition(fun (#resource{name = Queue}) -> + is_cycle(Queue, Deaths) + end, Queues), + OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) || + {table, D} <- Deaths], + OldQueues1 = [QName || {longstr, QName} <- OldQueues], + {NotCycling, [[QName | OldQueues1] || + #resource{name = QName} <- Cycling]}; + _ -> + NoCycles + end + end; +detect_cycles(_Reason, _Msg, Queues) -> + {Queues, []}. + +is_cycle(Queue, Deaths) -> + {Cycle, Rest} = + lists:splitwith( + fun ({table, D}) -> + {longstr, Queue} =/= rabbit_misc:table_lookup(D, <<"queue">>); + (_) -> + true + end, Deaths), + %% Is there a cycle, and if so, is it entirely due to expiry? + case Rest of + [] -> false; + [H|_] -> lists:all( + fun ({table, D}) -> + {longstr, <<"expired">>} =:= + rabbit_misc:table_lookup(D, <<"reason">>); + (_) -> + false + end, Cycle ++ [H]) + end. + +log_cycle_once(Queues) -> + Key = {queue_cycle, Queues}, + case get(Key) of + true -> ok; + undefined -> rabbit_log:warning( + "Message dropped. Dead-letter queues cycle detected" ++ + ": ~p~nThis cycle will NOT be reported again.~n", + [Queues]), + put(Key, true) + end. |