summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-01-08 16:10:08 +0000
committerEmile Joubert <emile@rabbitmq.com>2013-01-08 16:10:08 +0000
commit101814b527d93876831751a0c9542ac48879b49e (patch)
treed2dbd46862f12aad482f1a85a31b3b93f3e0b7fe
parent8f3985f1cd22af9bcf76db38e0a4b2c3a3505955 (diff)
parentaf0247f59ca83673146ad98b971823536488aae4 (diff)
downloadrabbitmq-server-101814b527d93876831751a0c9542ac48879b49e.tar.gz
Merged default into bug19375
-rw-r--r--src/rabbit_amqqueue_process.erl133
1 files changed, 65 insertions, 68 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f56df9d9..2506ff91 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -711,30 +711,63 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
T -> now_micros() + T * 1000
end.
-drop_expired_msgs(State = #q{dlx = DLX,
- backing_queue_state = BQS,
+drop_expired_msgs(State = #q{backing_queue_state = BQS,
backing_queue = BQ }) ->
Now = now_micros(),
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
- {Props, BQS1} = case DLX of
- undefined -> BQ:dropwhile(ExpirePred, BQS);
- _ -> {Next, Msgs, BQS2} =
- BQ:fetchwhile(ExpirePred,
- fun accumulate_msgs/3,
- [], BQS),
- case Msgs of
- [] -> ok;
- _ -> (dead_letter_fun(expired))(
- lists:reverse(Msgs))
- end,
- {Next, BQS2}
- end,
+ {Props, State1} =
+ with_dlx(
+ State#q.dlx,
+ fun (X) -> dead_letter_expired_msgs(ExpirePred, X, State) end,
+ fun () -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS),
+ {Next, State#q{backing_queue_state = BQS1}} end),
ensure_ttl_timer(case Props of
undefined -> undefined;
#message_properties{expiry = Exp} -> Exp
- end, State#q{backing_queue_state = BQS1}).
-
-accumulate_msgs(Msg, AckTag, Acc) -> [{Msg, AckTag} | Acc].
+ end, State1).
+
+with_dlx(undefined, _With, Without) -> Without();
+with_dlx(DLX, With, Without) -> case rabbit_exchange:lookup(DLX) of
+ {ok, X} -> With(X);
+ {error, not_found} -> Without()
+ end.
+
+dead_letter_expired_msgs(ExpirePred, X, State = #q{backing_queue = BQ}) ->
+ dead_letter_msgs(fun (DLFun, Acc, BQS1) ->
+ BQ:fetchwhile(ExpirePred, DLFun, Acc, BQS1)
+ end, expired, X, State).
+
+dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
+ {ok, State1} =
+ dead_letter_msgs(
+ fun (DLFun, Acc, BQS) ->
+ {Acc1, BQS1} = BQ:ackfold(DLFun, Acc, BQS, AckTags),
+ {ok, Acc1, BQS1}
+ end, rejected, X, State),
+ State1.
+
+dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
+ publish_seqno = SeqNo0,
+ unconfirmed = UC0,
+ queue_monitors = QMons0,
+ backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ QName = qname(State),
+ {Res, {AckImm1, SeqNo1, UC1, QMons1}, BQS1} =
+ Fun(fun (Msg, AckTag, {AckImm, SeqNo, UC, QMons}) ->
+ case dead_letter_publish(Msg, Reason,
+ X, RK, SeqNo, QName) of
+ [] -> {[AckTag | AckImm], SeqNo, UC, QMons};
+ QPids -> {AckImm, SeqNo + 1,
+ dtree:insert(SeqNo, QPids, AckTag, UC),
+ pmon:monitor_all(QPids, QMons)}
+ end
+ end, {[], SeqNo0, UC0, QMons0}, BQS),
+ {_Guids, BQS2} = BQ:ack(AckImm1, BQS1),
+ {Res, State#q{publish_seqno = SeqNo1,
+ unconfirmed = UC1,
+ queue_monitors = QMons1,
+ backing_queue_state = BQS2}}.
ensure_ttl_timer(undefined, State) ->
State;
@@ -755,11 +788,8 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
ensure_ttl_timer(_Expiry, State) ->
State.
-dead_letter_fun(Reason) ->
- fun(Msgs) -> gen_server2:cast(self(), {dead_letter, Msgs, Reason}) end.
-
-dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) ->
- DLMsg = make_dead_letter_msg(Reason, Msg, State),
+dead_letter_publish(Msg, Reason, X, RK, MsgSeqNo, QName) ->
+ DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName),
Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo),
{Queues, Cycles} = detect_dead_letter_cycles(
DLMsg, rabbit_exchange:route(X, Delivery)),
@@ -838,19 +868,16 @@ detect_dead_letter_cycles(#basic_message{content = Content}, Queues) ->
end
end.
-make_dead_letter_msg(Reason,
- Msg = #basic_message{content = Content,
+make_dead_letter_msg(Msg = #basic_message{content = Content,
exchange_name = Exchange,
routing_keys = RoutingKeys},
- State = #q{dlx = DLX, dlx_routing_key = DlxRoutingKey}) ->
+ Reason, DLX, RK, #resource{name = QName}) ->
{DeathRoutingKeys, HeadersFun1} =
- case DlxRoutingKey of
+ case RK of
undefined -> {RoutingKeys, fun (H) -> H end};
- _ -> {[DlxRoutingKey],
- fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
+ _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
end,
ReasonBin = list_to_binary(atom_to_list(Reason)),
- #resource{name = QName} = qname(State),
TimeSec = rabbit_misc:now_ms() div 1000,
HeadersFun2 =
fun (Headers) ->
@@ -1208,20 +1235,15 @@ handle_cast({ack, AckTags, ChPid}, State) ->
handle_cast({reject, AckTags, true, ChPid}, State) ->
noreply(requeue(AckTags, ChPid, State));
-handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = undefined}) ->
- noreply(ack(AckTags, ChPid, State));
-
handle_cast({reject, AckTags, false, ChPid}, State) ->
- DLXFun = dead_letter_fun(rejected),
- noreply(subtract_acks(
- ChPid, AckTags, State,
- fun (State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {ok, BQS1} = BQ:ackfold(
- fun (M, A, ok) -> DLXFun([{M, A}]) end,
- ok, BQS, AckTags),
- State1#q{backing_queue_state = BQS1}
- end));
+ noreply(with_dlx(
+ State#q.dlx,
+ fun (X) -> subtract_acks(ChPid, AckTags, State,
+ fun (State1) ->
+ dead_letter_rejected_msgs(
+ AckTags, X, State1)
+ end) end,
+ fun () -> ack(AckTags, ChPid, State) end));
handle_cast(delete_immediately, State) ->
stop(State);
@@ -1267,31 +1289,6 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) ->
- case rabbit_exchange:lookup(XName) of
- {ok, X} ->
- {AckImmediately, State2} =
- lists:foldl(
- fun({Msg, AckTag},
- {Acks, State1 = #q{publish_seqno = SeqNo,
- unconfirmed = UC,
- queue_monitors = QMons}}) ->
- case dead_letter_publish(Msg, Reason, X, State1) of
- [] -> {[AckTag | Acks], State1};
- QPids -> UC1 = dtree:insert(
- SeqNo, QPids, AckTag, UC),
- QMons1 = pmon:monitor_all(QPids, QMons),
- {Acks,
- State1#q{publish_seqno = SeqNo + 1,
- unconfirmed = UC1,
- queue_monitors = QMons1}}
- end
- end, {[], State}, Msgs),
- cleanup_after_confirm(AckImmediately, State2);
- {error, not_found} ->
- cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State)
- end;
-
handle_cast(start_mirroring, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
%% lookup again to get policy for init_with_existing_bq