From e4116d4ace857b190b38feee25d31bda06bb07f5 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 11 Dec 2014 16:17:57 +0000 Subject: Backport the part of 505868f421db which fixes ram_bytes when requeueing an in-memory message to delta, and do the same for beta. --- src/rabbit_variable_queue.erl | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index d076b534..6415eb6d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1485,7 +1485,12 @@ publish_alpha(MsgStatus, State) -> publish_beta(MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), - {m(trim_msg_status(MsgStatus1)), State1}. + MsgStatus2 = m(trim_msg_status(MsgStatus1)), + case {MsgStatus1#msg_status.msg =:= undefined, + MsgStatus2#msg_status.msg =:= undefined} of + {false, true} -> {MsgStatus2, upd_ram_bytes(-1, MsgStatus, State1)}; + _ -> {MsgStatus2, State1} + end. %% Rebuild queue, inserting sequence ids to maintain ordering queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) -> @@ -1521,8 +1526,12 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> msg_from_pending_ack(SeqId, State0), {_MsgStatus, State2} = maybe_write_to_disk(true, true, MsgStatus, State1), + State3 = case MsgStatus#msg_status.msg of + undefined -> State2; + _ -> upd_ram_bytes(-1, MsgStatus, State2) + end, {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], - upd_bytes(1, -1, MsgStatus, State2)} + upd_bytes(1, -1, MsgStatus, State3)} end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 -- cgit v1.2.1 From 3990a8390f6f11abdb88f3059fd1e0fcb46eb5bb Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 11 Dec 2014 16:36:56 +0000 Subject: Minor refactor. --- src/rabbit_variable_queue.erl | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 6415eb6d..1da3de26 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1198,6 +1198,8 @@ upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) -> msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size. +msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined. + remove(AckRequired, MsgStatus = #msg_status { seq_id = SeqId, msg_id = MsgId, @@ -1486,10 +1488,9 @@ publish_alpha(MsgStatus, State) -> publish_beta(MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - case {MsgStatus1#msg_status.msg =:= undefined, - MsgStatus2#msg_status.msg =:= undefined} of - {false, true} -> {MsgStatus2, upd_ram_bytes(-1, MsgStatus, State1)}; - _ -> {MsgStatus2, State1} + case msg_in_ram(MsgStatus1) andalso not msg_in_ram(MsgStatus2) of + true -> {MsgStatus2, upd_ram_bytes(-1, MsgStatus, State1)}; + _ -> {MsgStatus2, State1} end. %% Rebuild queue, inserting sequence ids to maintain ordering @@ -1526,10 +1527,11 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> msg_from_pending_ack(SeqId, State0), {_MsgStatus, State2} = maybe_write_to_disk(true, true, MsgStatus, State1), - State3 = case MsgStatus#msg_status.msg of - undefined -> State2; - _ -> upd_ram_bytes(-1, MsgStatus, State2) - end, + State3 = + case msg_in_ram(MsgStatus) of + false -> State2; + true -> upd_ram_bytes(-1, MsgStatus, State2) + end, {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], upd_bytes(1, -1, MsgStatus, State3)} end, {Delta, MsgIds, State}, SeqIds). -- cgit v1.2.1