diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-03-04 22:24:51 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-03-04 22:24:51 +0000 |
commit | bac67caafb6c00f2141a1da98d29c29dfb6bf8d9 (patch) | |
tree | e06ba80e34012378d353bb03f49c12bd31ad8b66 /src/rabbit_variable_queue.erl | |
parent | b6058d0b1bef5c5f9eddff225ff2accc70eea086 (diff) | |
parent | c8044c53b6a8eed5b685ff263b4ffbcba37a98c7 (diff) | |
download | rabbitmq-server-bac67caafb6c00f2141a1da98d29c29dfb6bf8d9.tar.gz |
merge default into bug23882
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r-- | src/rabbit_variable_queue.erl | 37 |
1 files changed, 18 insertions, 19 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7f702409..67c4cc3c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -271,13 +271,13 @@ msg_on_disk, index_on_disk, msg_props - }). + }). -record(delta, { start_seq_id, %% start_seq_id is inclusive count, end_seq_id %% end_seq_id is exclusive - }). + }). -record(tx, { pending_messages, pending_acks }). @@ -517,8 +517,7 @@ publish(Msg, MsgProps, State) -> a(reduce_memory_use(State1)). publish_delivered(false, #basic_message { guid = Guid }, - #message_properties { - needs_confirming = NeedsConfirming }, + #message_properties { needs_confirming = NeedsConfirming }, State = #vqstate { async_callback = Callback, len = 0 }) -> case NeedsConfirming of true -> blind_confirm(Callback, gb_sets:singleton(Guid)); @@ -639,12 +638,12 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { %% 3. If an ack is required, add something sensible to PA {AckTag, State1} = case AckRequired of - true -> StateN = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, State), - {SeqId, StateN}; - false -> {undefined, State} - end, + true -> StateN = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, State), + {SeqId, StateN}; + false -> {undefined, State} + end, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), Len1 = Len - 1, @@ -787,8 +786,8 @@ ram_duration(State = #vqstate { RamAckCount = gb_trees:size(RamAckIndex), Duration = %% msgs+acks / (msgs+acks/sec) == sec - case AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso - AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0 of + case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso + AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0) of true -> infinity; false -> (RamMsgCountPrev + RamMsgCount + RamAckCount + RamAckCountPrev) / @@ -1404,7 +1403,7 @@ accumulate_ack_init() -> {[], orddict:new()}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, index_on_disk = false }, - {PersistentSeqIdsAcc, GuidsByStore}) -> + {PersistentSeqIdsAcc, GuidsByStore}) -> {PersistentSeqIdsAcc, GuidsByStore}; accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, {PersistentSeqIdsAcc, GuidsByStore}) -> @@ -1825,12 +1824,12 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> multiple_routing_keys() -> transform_storage( - fun ({basic_message, ExchangeName, Routing_Key, Content, - Guid, Persistent}) -> - {ok, {basic_message, ExchangeName, [Routing_Key], Content, - Guid, Persistent}}; - (_) -> {error, corrupt_message} - end), + fun ({basic_message, ExchangeName, Routing_Key, Content, + Guid, Persistent}) -> + {ok, {basic_message, ExchangeName, [Routing_Key], Content, + Guid, Persistent}}; + (_) -> {error, corrupt_message} + end), ok. |