summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-04 19:44:45 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-04 19:44:45 +0000
commitdc87918e5f1160d7f7c72b55ab484720072b26af (patch)
treef6c89c1d777a30328dac8d8657df8b730f5f6591
parent93cad02186b4c0ac27a02090a9e0c719ac6dfc8d (diff)
downloadrabbitmq-server-dc87918e5f1160d7f7c72b55ab484720072b26af.tar.gz
refactor: disentangle head dropping and publishing
-rw-r--r--src/rabbit_amqqueue_process.erl29
1 files changed, 12 insertions, 17 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a4a30021..77514383 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -560,7 +560,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
{false, State#q{backing_queue_state = BQS1}}
end.
-deliver_or_enqueue(Delivery = #delivery{message = Message},
+deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
Delivered, State) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State),
@@ -571,28 +571,23 @@ deliver_or_enqueue(Delivery = #delivery{message = Message},
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
{false, State2} ->
- BQS1 = publish_max(Delivery, Props, Delivered, State2),
+ State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ maybe_drop_head(State2),
+ BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
- State2#q{backing_queue_state = BQS1})
+ State3#q{backing_queue_state = BQS1})
end.
-publish_max(#delivery{message = Message,
- sender = SenderPid},
- Props, Delivered, #q{backing_queue = BQ,
- backing_queue_state = BQS,
- max_length = undefined}) ->
- BQ:publish(Message, Props, Delivered, SenderPid, BQS);
-publish_max(#delivery{message = Message,
- msg_seq_no = MsgSeqNo,
- sender = SenderPid},
- Props, Delivered, #q{backing_queue = BQ,
- backing_queue_state = BQS,
- max_length = MaxLen}) ->
+maybe_drop_head(State = #q{max_length = undefined}) ->
+ State;
+maybe_drop_head(State = #q{max_length = MaxLen,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
case BQ:len(BQS) >= MaxLen of
true -> {{Msg, _IsDelivered, AckTag}, BQS1} = BQ:fetch(true, BQS),
(dead_letter_fun(maxlen))([{Msg, AckTag}]),
- BQ:publish(Message, Props, Delivered, SenderPid, BQS1);
- false -> BQ:publish(Message, Props, Delivered, SenderPid, BQS)
+ State#q{backing_queue_state = BQS1};
+ false -> State
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ,