summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl113
1 files changed, 1 insertions, 112 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6391ebe6..bd258ecf 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -717,117 +717,17 @@ dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
QName = qname(State),
{Res, Acks1, BQS1} =
Fun(fun (Msg, AckTag, Acks) ->
- dead_letter_publish(Msg, Reason, X, RK, QName),
+ rabbit_dead_letter:publish(Msg, Reason, X, RK, QName),
[AckTag | Acks]
end, [], BQS),
{_Guids, BQS2} = BQ:ack(Acks1, BQS1),
{Res, State#q{backing_queue_state = BQS2}}.
-dead_letter_publish(Msg, Reason, X, RK, QName) ->
- DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName),
- Delivery = rabbit_basic:delivery(false, DLMsg, undefined),
- {Queues, Cycles} = detect_dead_letter_cycles(
- Reason, DLMsg, rabbit_exchange:route(X, Delivery)),
- lists:foreach(fun log_cycle_once/1, Cycles),
- rabbit_amqqueue:deliver( rabbit_amqqueue:lookup(Queues), Delivery),
- ok.
-
stop(State) -> stop(noreply, State).
stop(noreply, State) -> {stop, normal, State};
stop(Reply, State) -> {stop, normal, Reply, State}.
-
-detect_dead_letter_cycles(expired,
- #basic_message{content = Content}, Queues) ->
- #content{properties = #'P_basic'{headers = Headers}} =
- rabbit_binary_parser:ensure_content_decoded(Content),
- NoCycles = {Queues, []},
- case Headers of
- undefined ->
- NoCycles;
- _ ->
- case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
- {array, Deaths} ->
- {Cycling, NotCycling} =
- lists:partition(
- fun (#resource{name = Queue}) ->
- is_dead_letter_cycle(Queue, Deaths)
- end, Queues),
- OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
- {table, D} <- Deaths],
- OldQueues1 = [QName || {longstr, QName} <- OldQueues],
- {NotCycling, [[QName | OldQueues1] ||
- #resource{name = QName} <- Cycling]};
- _ ->
- NoCycles
- end
- end;
-detect_dead_letter_cycles(_Reason, _Msg, Queues) ->
- {Queues, []}.
-
-is_dead_letter_cycle(Queue, Deaths) ->
- {Cycle, Rest} =
- lists:splitwith(
- fun ({table, D}) ->
- {longstr, Queue} =/= rabbit_misc:table_lookup(D, <<"queue">>);
- (_) ->
- true
- end, Deaths),
- %% Is there a cycle, and if so, is it entirely due to expiry?
- case Rest of
- [] -> false;
- [H|_] -> lists:all(
- fun ({table, D}) ->
- {longstr, <<"expired">>} =:=
- rabbit_misc:table_lookup(D, <<"reason">>);
- (_) ->
- false
- end, Cycle ++ [H])
- end.
-
-make_dead_letter_msg(Msg = #basic_message{content = Content,
- exchange_name = Exchange,
- routing_keys = RoutingKeys},
- Reason, DLX, RK, #resource{name = QName}) ->
- {DeathRoutingKeys, HeadersFun1} =
- case RK of
- undefined -> {RoutingKeys, fun (H) -> H end};
- _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
- end,
- ReasonBin = list_to_binary(atom_to_list(Reason)),
- TimeSec = rabbit_misc:now_ms() div 1000,
- PerMsgTTL = per_msg_ttl_header(Content#content.properties),
- HeadersFun2 =
- fun (Headers) ->
- %% The first routing key is the one specified in the
- %% basic.publish; all others are CC or BCC keys.
- RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
- RKs1 = [{longstr, Key} || Key <- RKs],
- Info = [{<<"reason">>, longstr, ReasonBin},
- {<<"queue">>, longstr, QName},
- {<<"time">>, timestamp, TimeSec},
- {<<"exchange">>, longstr, Exchange#resource.name},
- {<<"routing-keys">>, array, RKs1}] ++ PerMsgTTL,
- HeadersFun1(rabbit_basic:prepend_table_header(<<"x-death">>,
- Info, Headers))
- end,
- Content1 = #content{properties = Props} =
- rabbit_basic:map_headers(HeadersFun2, Content),
- Content2 = Content1#content{properties =
- Props#'P_basic'{expiration = undefined}},
- Msg#basic_message{exchange_name = DLX,
- id = rabbit_guid:gen(),
- routing_keys = DeathRoutingKeys,
- content = Content2}.
-
-per_msg_ttl_header(#'P_basic'{expiration = undefined}) ->
- [];
-per_msg_ttl_header(#'P_basic'{expiration = Expiration}) ->
- [{<<"original-expiration">>, longstr, Expiration}];
-per_msg_ttl_header(_) ->
- [].
-
now_micros() -> timer:now_diff(now(), {0,0,0}).
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -1319,14 +1219,3 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
{hibernate, stop_rate_timer(State1)}.
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
-
-log_cycle_once(Queues) ->
- Key = {queue_cycle, Queues},
- case get(Key) of
- true -> ok;
- undefined -> rabbit_log:warning(
- "Message dropped. Dead-letter queues cycle detected" ++
- ": ~p~nThis cycle will NOT be reported again.~n",
- [Queues]),
- put(Key, true)
- end.