diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-04 19:44:45 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-04 19:44:45 +0000 |
commit | dc87918e5f1160d7f7c72b55ab484720072b26af (patch) | |
tree | f6c89c1d777a30328dac8d8657df8b730f5f6591 | |
parent | 93cad02186b4c0ac27a02090a9e0c719ac6dfc8d (diff) | |
download | rabbitmq-server-dc87918e5f1160d7f7c72b55ab484720072b26af.tar.gz |
refactor: disentangle head dropping and publishing
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 29 |
1 files changed, 12 insertions, 17 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a4a30021..77514383 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -560,7 +560,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, {false, State#q{backing_queue_state = BQS1}} end. -deliver_or_enqueue(Delivery = #delivery{message = Message}, +deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, Delivered, State) -> {Confirm, State1} = send_or_record_confirm(Delivery, State), Props = message_properties(Message, Confirm, State), @@ -571,28 +571,23 @@ deliver_or_enqueue(Delivery = #delivery{message = Message}, {false, State2 = #q{ttl = 0, dlx = undefined}} -> discard(Delivery, State2); {false, State2} -> - BQS1 = publish_max(Delivery, Props, Delivered, State2), + State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = + maybe_drop_head(State2), + BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), ensure_ttl_timer(Props#message_properties.expiry, - State2#q{backing_queue_state = BQS1}) + State3#q{backing_queue_state = BQS1}) end. -publish_max(#delivery{message = Message, - sender = SenderPid}, - Props, Delivered, #q{backing_queue = BQ, - backing_queue_state = BQS, - max_length = undefined}) -> - BQ:publish(Message, Props, Delivered, SenderPid, BQS); -publish_max(#delivery{message = Message, - msg_seq_no = MsgSeqNo, - sender = SenderPid}, - Props, Delivered, #q{backing_queue = BQ, - backing_queue_state = BQS, - max_length = MaxLen}) -> +maybe_drop_head(State = #q{max_length = undefined}) -> + State; +maybe_drop_head(State = #q{max_length = MaxLen, + backing_queue = BQ, + backing_queue_state = BQS}) -> case BQ:len(BQS) >= MaxLen of true -> {{Msg, _IsDelivered, AckTag}, BQS1} = BQ:fetch(true, BQS), (dead_letter_fun(maxlen))([{Msg, AckTag}]), - BQ:publish(Message, Props, Delivered, SenderPid, BQS1); - false -> BQ:publish(Message, Props, Delivered, SenderPid, BQS) + State#q{backing_queue_state = BQS1}; + false -> State end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, |