summaryrefslogtreecommitdiff
path: root/src/rabbit_variable_queue.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-03-04 22:24:51 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-03-04 22:24:51 +0000
commitbac67caafb6c00f2141a1da98d29c29dfb6bf8d9 (patch)
treee06ba80e34012378d353bb03f49c12bd31ad8b66 /src/rabbit_variable_queue.erl
parentb6058d0b1bef5c5f9eddff225ff2accc70eea086 (diff)
parentc8044c53b6a8eed5b685ff263b4ffbcba37a98c7 (diff)
downloadrabbitmq-server-bac67caafb6c00f2141a1da98d29c29dfb6bf8d9.tar.gz
merge default into bug23882
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r--src/rabbit_variable_queue.erl37
1 files changed, 18 insertions, 19 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 7f702409..67c4cc3c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -271,13 +271,13 @@
msg_on_disk,
index_on_disk,
msg_props
- }).
+ }).
-record(delta,
{ start_seq_id, %% start_seq_id is inclusive
count,
end_seq_id %% end_seq_id is exclusive
- }).
+ }).
-record(tx, { pending_messages, pending_acks }).
@@ -517,8 +517,7 @@ publish(Msg, MsgProps, State) ->
a(reduce_memory_use(State1)).
publish_delivered(false, #basic_message { guid = Guid },
- #message_properties {
- needs_confirming = NeedsConfirming },
+ #message_properties { needs_confirming = NeedsConfirming },
State = #vqstate { async_callback = Callback, len = 0 }) ->
case NeedsConfirming of
true -> blind_confirm(Callback, gb_sets:singleton(Guid));
@@ -639,12 +638,12 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
%% 3. If an ack is required, add something sensible to PA
{AckTag, State1} = case AckRequired of
- true -> StateN = record_pending_ack(
- MsgStatus #msg_status {
- is_delivered = true }, State),
- {SeqId, StateN};
- false -> {undefined, State}
- end,
+ true -> StateN = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, State),
+ {SeqId, StateN};
+ false -> {undefined, State}
+ end,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
Len1 = Len - 1,
@@ -787,8 +786,8 @@ ram_duration(State = #vqstate {
RamAckCount = gb_trees:size(RamAckIndex),
Duration = %% msgs+acks / (msgs+acks/sec) == sec
- case AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
- AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0 of
+ case (AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso
+ AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0) of
true -> infinity;
false -> (RamMsgCountPrev + RamMsgCount +
RamAckCount + RamAckCountPrev) /
@@ -1404,7 +1403,7 @@ accumulate_ack_init() -> {[], orddict:new()}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
index_on_disk = false },
- {PersistentSeqIdsAcc, GuidsByStore}) ->
+ {PersistentSeqIdsAcc, GuidsByStore}) ->
{PersistentSeqIdsAcc, GuidsByStore};
accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps},
{PersistentSeqIdsAcc, GuidsByStore}) ->
@@ -1825,12 +1824,12 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
multiple_routing_keys() ->
transform_storage(
- fun ({basic_message, ExchangeName, Routing_Key, Content,
- Guid, Persistent}) ->
- {ok, {basic_message, ExchangeName, [Routing_Key], Content,
- Guid, Persistent}};
- (_) -> {error, corrupt_message}
- end),
+ fun ({basic_message, ExchangeName, Routing_Key, Content,
+ Guid, Persistent}) ->
+ {ok, {basic_message, ExchangeName, [Routing_Key], Content,
+ Guid, Persistent}};
+ (_) -> {error, corrupt_message}
+ end),
ok.