summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-08 17:09:52 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-08 17:09:52 +0000
commitf0ceca7de46e4d70ec74020fff48dded3096c09e (patch)
tree5670a4f3b8affb262d2dd76c7d5f28d9a0d93e1c
parentc56c0e6d42a9949606aa5661d69be4beed32c3e2 (diff)
downloadrabbitmq-server-bug25951.tar.gz
extract dead-letter publishing from rabbit_amqqueue_processbug25951
-rw-r--r--src/rabbit_amqqueue_process.erl113
-rw-r--r--src/rabbit_dead_letter.erl141
2 files changed, 142 insertions, 112 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6391ebe6..bd258ecf 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -717,117 +717,17 @@ dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
QName = qname(State),
{Res, Acks1, BQS1} =
Fun(fun (Msg, AckTag, Acks) ->
- dead_letter_publish(Msg, Reason, X, RK, QName),
+ rabbit_dead_letter:publish(Msg, Reason, X, RK, QName),
[AckTag | Acks]
end, [], BQS),
{_Guids, BQS2} = BQ:ack(Acks1, BQS1),
{Res, State#q{backing_queue_state = BQS2}}.
-dead_letter_publish(Msg, Reason, X, RK, QName) ->
- DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName),
- Delivery = rabbit_basic:delivery(false, DLMsg, undefined),
- {Queues, Cycles} = detect_dead_letter_cycles(
- Reason, DLMsg, rabbit_exchange:route(X, Delivery)),
- lists:foreach(fun log_cycle_once/1, Cycles),
- rabbit_amqqueue:deliver( rabbit_amqqueue:lookup(Queues), Delivery),
- ok.
-
stop(State) -> stop(noreply, State).
stop(noreply, State) -> {stop, normal, State};
stop(Reply, State) -> {stop, normal, Reply, State}.
-
-detect_dead_letter_cycles(expired,
- #basic_message{content = Content}, Queues) ->
- #content{properties = #'P_basic'{headers = Headers}} =
- rabbit_binary_parser:ensure_content_decoded(Content),
- NoCycles = {Queues, []},
- case Headers of
- undefined ->
- NoCycles;
- _ ->
- case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
- {array, Deaths} ->
- {Cycling, NotCycling} =
- lists:partition(
- fun (#resource{name = Queue}) ->
- is_dead_letter_cycle(Queue, Deaths)
- end, Queues),
- OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
- {table, D} <- Deaths],
- OldQueues1 = [QName || {longstr, QName} <- OldQueues],
- {NotCycling, [[QName | OldQueues1] ||
- #resource{name = QName} <- Cycling]};
- _ ->
- NoCycles
- end
- end;
-detect_dead_letter_cycles(_Reason, _Msg, Queues) ->
- {Queues, []}.
-
-is_dead_letter_cycle(Queue, Deaths) ->
- {Cycle, Rest} =
- lists:splitwith(
- fun ({table, D}) ->
- {longstr, Queue} =/= rabbit_misc:table_lookup(D, <<"queue">>);
- (_) ->
- true
- end, Deaths),
- %% Is there a cycle, and if so, is it entirely due to expiry?
- case Rest of
- [] -> false;
- [H|_] -> lists:all(
- fun ({table, D}) ->
- {longstr, <<"expired">>} =:=
- rabbit_misc:table_lookup(D, <<"reason">>);
- (_) ->
- false
- end, Cycle ++ [H])
- end.
-
-make_dead_letter_msg(Msg = #basic_message{content = Content,
- exchange_name = Exchange,
- routing_keys = RoutingKeys},
- Reason, DLX, RK, #resource{name = QName}) ->
- {DeathRoutingKeys, HeadersFun1} =
- case RK of
- undefined -> {RoutingKeys, fun (H) -> H end};
- _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
- end,
- ReasonBin = list_to_binary(atom_to_list(Reason)),
- TimeSec = rabbit_misc:now_ms() div 1000,
- PerMsgTTL = per_msg_ttl_header(Content#content.properties),
- HeadersFun2 =
- fun (Headers) ->
- %% The first routing key is the one specified in the
- %% basic.publish; all others are CC or BCC keys.
- RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
- RKs1 = [{longstr, Key} || Key <- RKs],
- Info = [{<<"reason">>, longstr, ReasonBin},
- {<<"queue">>, longstr, QName},
- {<<"time">>, timestamp, TimeSec},
- {<<"exchange">>, longstr, Exchange#resource.name},
- {<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL,
- HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>,
- Info, Headers))
- end,
- Content1 = #content{properties = Props} =
- rabbit_basic:map_headers(HeadersFun2, Content),
- Content2 = Content1#content{properties =
- Props#'P_basic'{expiration = undefined}},
- Msg#basic_message{exchange_name = DLX,
- id = rabbit_guid:gen(),
- routing_keys = DeathRoutingKeys,
- content = Content2}.
-
-per_msg_ttl_header(#'P_basic'{expiration = undefined}) ->
- [];
-per_msg_ttl_header(#'P_basic'{expiration = Expiration}) ->
- [{<<"original-expiration">>, longstr, Expiration}];
-per_msg_ttl_header(_) ->
- [].
-
now_micros() -> timer:now_diff(now(), {0,0,0}).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -1319,14 +1219,3 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
{hibernate, stop_rate_timer(State1)}.
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
-
-log_cycle_once(Queues) ->
- Key = {queue_cycle, Queues},
- case get(Key) of
- true -> ok;
- undefined -> rabbit_log:warning(
- "Message dropped. Dead-letter queues cycle detected" ++
- ": ~p~nThis cycle will NOT be reported again.~n",
- [Queues]),
- put(Key, true)
- end.
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
new file mode 100644
index 00000000..640b282e
--- /dev/null
+++ b/src/rabbit_dead_letter.erl
@@ -0,0 +1,141 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_dead_letter).
+
+-export([publish/5]).
+
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec publish(rabbit_types:message(), atom(), rabbit_types:exchange(),
+ 'undefined' | binary(), rabbit_amqqueue:name()) -> 'ok'.
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+publish(Msg, Reason, X, RK, QName) ->
+ DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName),
+ Delivery = rabbit_basic:delivery(false, DLMsg, undefined),
+ {Queues, Cycles} = detect_cycles(Reason, DLMsg,
+ rabbit_exchange:route(X, Delivery)),
+ lists:foreach(fun log_cycle_once/1, Cycles),
+ rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(Queues), Delivery),
+ ok.
+
+make_msg(Msg = #basic_message{content = Content,
+ exchange_name = Exchange,
+ routing_keys = RoutingKeys},
+ Reason, DLX, RK, #resource{name = QName}) ->
+ {DeathRoutingKeys, HeadersFun1} =
+ case RK of
+ undefined -> {RoutingKeys, fun (H) -> H end};
+ _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
+ end,
+ ReasonBin = list_to_binary(atom_to_list(Reason)),
+ TimeSec = rabbit_misc:now_ms() div 1000,
+ PerMsgTTL = per_msg_ttl_header(Content#content.properties),
+ HeadersFun2 =
+ fun (Headers) ->
+ %% The first routing key is the one specified in the
+ %% basic.publish; all others are CC or BCC keys.
+ RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
+ RKs1 = [{longstr, Key} || Key <- RKs],
+ Info = [{<<"reason">>, longstr, ReasonBin},
+ {<<"queue">>, longstr, QName},
+ {<<"time">>, timestamp, TimeSec},
+ {<<"exchange">>, longstr, Exchange#resource.name},
+ {<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL,
+ HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>,
+ Info, Headers))
+ end,
+ Content1 = #content{properties = Props} =
+ rabbit_basic:map_headers(HeadersFun2, Content),
+ Content2 = Content1#content{properties =
+ Props#'P_basic'{expiration = undefined}},
+ Msg#basic_message{exchange_name = DLX,
+ id = rabbit_guid:gen(),
+ routing_keys = DeathRoutingKeys,
+ content = Content2}.
+
+per_msg_ttl_header(#'P_basic'{expiration = undefined}) ->
+ [];
+per_msg_ttl_header(#'P_basic'{expiration = Expiration}) ->
+ [{<<"original-expiration">>, longstr, Expiration}];
+per_msg_ttl_header(_) ->
+ [].
+
+detect_cycles(expired, #basic_message{content = Content}, Queues) ->
+ #content{properties = #'P_basic'{headers = Headers}} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ NoCycles = {Queues, []},
+ case Headers of
+ undefined ->
+ NoCycles;
+ _ ->
+ case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
+ {array, Deaths} ->
+ {Cycling, NotCycling} =
+ lists:partition(fun (#resource{name = Queue}) ->
+ is_cycle(Queue, Deaths)
+ end, Queues),
+ OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
+ {table, D} <- Deaths],
+ OldQueues1 = [QName || {longstr, QName} <- OldQueues],
+ {NotCycling, [[QName | OldQueues1] ||
+ #resource{name = QName} <- Cycling]};
+ _ ->
+ NoCycles
+ end
+ end;
+detect_cycles(_Reason, _Msg, Queues) ->
+ {Queues, []}.
+
+is_cycle(Queue, Deaths) ->
+ {Cycle, Rest} =
+ lists:splitwith(
+ fun ({table, D}) ->
+ {longstr, Queue} =/= rabbit_misc:table_lookup(D, <<"queue">>);
+ (_) ->
+ true
+ end, Deaths),
+ %% Is there a cycle, and if so, is it entirely due to expiry?
+ case Rest of
+ [] -> false;
+ [H|_] -> lists:all(
+ fun ({table, D}) ->
+ {longstr, <<"expired">>} =:=
+ rabbit_misc:table_lookup(D, <<"reason">>);
+ (_) ->
+ false
+ end, Cycle ++ [H])
+ end.
+
+log_cycle_once(Queues) ->
+ Key = {queue_cycle, Queues},
+ case get(Key) of
+ true -> ok;
+ undefined -> rabbit_log:warning(
+ "Message dropped. Dead-letter queues cycle detected" ++
+ ": ~p~nThis cycle will NOT be reported again.~n",
+ [Queues]),
+ put(Key, true)
+ end.