summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-11 15:34:50 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-11 15:34:50 +0100
commit7164c15a18fc08194d37ddab58a6c12096592755 (patch)
tree050aba895c7673fd971293303cd617d9a46d6197
parentae53956d8488e07e745d1f9a88b7ae251e7f88b6 (diff)
downloadrabbitmq-server-7164c15a18fc08194d37ddab58a6c12096592755.tar.gz
Minor tweaks: reduce distance to default, fix spec, improve assertion, improve comment, cosmetic.
-rw-r--r--src/rabbit_queue_index.erl7
-rw-r--r--src/rabbit_variable_queue.erl8
2 files changed, 10 insertions, 5 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 923abb17..0f572866 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -415,9 +415,8 @@ init_clean(RecoveredCounts, State) ->
lists:foldl(
fun ({Seg, UnackedCount}, SegmentsN) ->
Segment = segment_find_or_new(Seg, Dir, SegmentsN),
- segment_store(
- Segment #segment { unacked = UnackedCount },
- SegmentsN)
+ segment_store(Segment #segment { unacked = UnackedCount },
+ SegmentsN)
end, Segments, RecoveredCounts),
%% the counts above include transient messages, which would be the
%% wrong thing to return
@@ -562,7 +561,7 @@ scan_segments(Fun, Acc, State) ->
%%----------------------------------------------------------------------------
create_pub_record_body(MsgId, #message_properties { expiry = Expiry,
- size = Size}) ->
+ size = Size }) ->
[MsgId, expiry_to_binary(Expiry), <<Size:?SIZE_BITS>>].
expiry_to_binary(undefined) -> <<?NO_EXPIRY:?EXPIRY_BITS>>;
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index a598dfab..9dcd6c76 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -353,6 +353,8 @@
target_ram_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(),
+ ram_ack_count_prev :: non_neg_integer(),
+ ram_bytes :: non_neg_integer(),
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
rates :: rates(),
@@ -884,6 +886,7 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
len = Len,
bytes = Bytes,
persistent_count = PersistentCount,
+ persistent_bytes = PersistentBytes,
ram_msg_count = RamMsgCount,
ram_bytes = RamBytes}) ->
E1 = ?QUEUE:is_empty(Q1),
@@ -901,6 +904,7 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
true = Len >= 0,
true = Bytes >= 0,
true = PersistentCount >= 0,
+ true = PersistentBytes >= 0,
true = RamMsgCount >= 0,
true = RamMsgCount =< Len,
true = RamBytes >= 0,
@@ -1454,7 +1458,9 @@ publish_alpha(MsgStatus, State) ->
{MsgStatus, inc_ram_msg_count(State)}.
%% [1] We increase the ram_bytes here because we paged the message in
%% to requeue it, not purely because we requeued it. Hence in the
-%% second head it's already accounted for as already in memory.
+%% second head it's already accounted for as already in memory. OTOH
+%% ram_msg_count does not include unacked messages, so it needs
+%% incrementing in both heads.
publish_beta(MsgStatus, State) ->
{MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),