summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-01 21:41:29 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-01 21:41:29 +0000
commitde7441ce679272f8f7200b768c00cf1e06996823 (patch)
tree9fe983ab16aeebaa68ee23e9f8b0cb8f9cda4838
parent2ae774b2f225b08ff9b1240d70f0d62c948e3362 (diff)
downloadrabbitmq-server-de7441ce679272f8f7200b768c00cf1e06996823.tar.gz
don't send dead-lettered messages to self() during 'reject' handling
since they all end up in the mailbox, consuming memory
-rw-r--r--src/rabbit_amqqueue_process.erl69
1 files changed, 33 insertions, 36 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b908361c..8e20f4e1 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -757,6 +757,29 @@ dead_letter_expired_msgs(ExpirePred, X, State = #q{dlx_routing_key = RK,
queue_monitors = QMons1,
backing_queue_state = BQS2}}.
+dead_letter_rejected_msgs(AckTags, X, State = #q{dlx_routing_key = RK,
+ publish_seqno = SeqNo0,
+ unconfirmed = UC0,
+ queue_monitors = QMons0,
+ backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ QName = qname(State),
+ {{ConfirmImm1, SeqNo1, UC1, QMons1}, BQS1} =
+ BQ:ackfold(
+ fun (Msg, AckTag, {ConfirmImm, SeqNo, UC, QMons}) ->
+ case dead_letter_publish(Msg, rejected, X, RK, SeqNo, QName) of
+ [] -> {[AckTag | ConfirmImm], SeqNo, UC, QMons};
+ QPids -> {ConfirmImm, SeqNo + 1,
+ dtree:insert(SeqNo, QPids, AckTag, UC),
+ pmon:monitor_all(QPids, QMons)}
+ end
+ end, {[], SeqNo0, UC0, QMons0}, BQS, AckTags),
+ {_Guids, BQS2} = BQ:ack(ConfirmImm1, BQS1),
+ State#q{publish_seqno = SeqNo1,
+ unconfirmed = UC1,
+ queue_monitors = QMons1,
+ backing_queue_state = BQS2}.
+
ensure_ttl_timer(undefined, State) ->
State;
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined}) ->
@@ -776,11 +799,6 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
ensure_ttl_timer(_Expiry, State) ->
State.
-dead_letter_fun(Reason) ->
- fun(Msg, AckTag) ->
- gen_server2:cast(self(), {dead_letter, Msg, AckTag, Reason})
- end.
-
dead_letter_publish(Msg, Reason, X, RK, MsgSeqNo, QName) ->
DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName),
Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo),
@@ -1216,17 +1234,16 @@ handle_cast({reject, AckTags, true, ChPid}, State) ->
handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = undefined}) ->
noreply(ack(AckTags, ChPid, State));
-handle_cast({reject, AckTags, false, ChPid}, State) ->
- DLXFun = dead_letter_fun(rejected),
- noreply(subtract_acks(
- ChPid, AckTags, State,
- fun (State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {ok, BQS1} = BQ:ackfold(
- fun (M, A, ok) -> DLXFun([{M, A}]) end,
- ok, BQS, AckTags),
- State1#q{backing_queue_state = BQS1}
- end));
+handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = DLX}) ->
+ noreply(case rabbit_exchange:lookup(DLX) of
+ {ok, X} -> subtract_acks(
+ ChPid, AckTags, State,
+ fun (State1) ->
+ dead_letter_rejected_msgs(
+ AckTags, X, State1)
+ end);
+ {error, not_found} -> ack(AckTags, ChPid, State)
+ end);
handle_cast(delete_immediately, State) ->
stop(State);
@@ -1272,26 +1289,6 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast({dead_letter, Msg, AckTag, Reason},
- State = #q{dlx = XName,
- dlx_routing_key = RK,
- publish_seqno = SeqNo,
- unconfirmed = UC,
- queue_monitors = QMons}) ->
- case rabbit_exchange:lookup(XName) of
- {ok, X} ->
- case dead_letter_publish(Msg, Reason, X, RK, SeqNo, qname(State)) of
- [] -> cleanup_after_confirm([AckTag], State);
- QPids -> UC1 = dtree:insert(SeqNo, QPids, AckTag, UC),
- QMons1 = pmon:monitor_all(QPids, QMons),
- State#q{publish_seqno = SeqNo + 1,
- unconfirmed = UC1,
- queue_monitors = QMons1}
- end;
- {error, not_found} ->
- cleanup_after_confirm([AckTag], State)
- end;
-
handle_cast(start_mirroring, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
%% lookup again to get policy for init_with_existing_bq