summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_tests.erl30
-rw-r--r--src/rabbit_variable_queue.erl2
2 files changed, 21 insertions, 11 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index c96000a9..9eddb51d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2457,7 +2457,7 @@ with_fresh_variable_queue(Fun) ->
spawn_link(fun() ->
ok = empty_test_queue(),
VQ = variable_queue_init(test_amqqueue(true), false),
- S0 = rabbit_variable_queue:status(VQ),
+ S0 = variable_queue_status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta,
{delta, undefined, 0, undefined}},
@@ -2582,7 +2582,7 @@ variable_queue_with_holes(VQ0) ->
{delta, _, 0, _} -> true;
0 -> true;
_ -> false
- end || {K, V} <- rabbit_variable_queue:status(VQ8),
+ end || {K, V} <- variable_queue_status(VQ8),
lists:member(K, [q1, delta, q3])],
Depth = Count + Interval,
Depth = rabbit_variable_queue:depth(VQ8),
@@ -2652,17 +2652,20 @@ test_variable_queue_ack_limiting(VQ0) ->
%% fetch half the messages
{VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3),
- VQ5 = check_variable_queue_status(VQ4, [{len , Len div 2},
- {ram_ack_count, Len div 2},
- {ram_msg_count, Len div 2}]),
+ VQ5 = check_variable_queue_status(
+ VQ4, [{len, Len div 2},
+ {messages_unacknowledged_ram, Len div 2},
+ {messages_ready_ram, Len div 2},
+ {messages_ram, Len}]),
%% ensure all acks go to disk on 0 duration target
VQ6 = check_variable_queue_status(
variable_queue_set_ram_duration_target(0, VQ5),
- [{len, Len div 2},
- {target_ram_count, 0},
- {ram_msg_count, 0},
- {ram_ack_count, 0}]),
+ [{len, Len div 2},
+ {target_ram_count, 0},
+ {messages_unacknowledged_ram, 0},
+ {messages_ready_ram, 0},
+ {messages_ram, 0}]),
VQ6.
@@ -2763,7 +2766,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
fun (Duration1, VQ4) ->
{_Duration, VQ5} = rabbit_variable_queue:ram_duration(VQ4),
io:format("~p:~n~p~n",
- [Duration1, rabbit_variable_queue:status(VQ5)]),
+ [Duration1, variable_queue_status(VQ5)]),
VQ6 = variable_queue_set_ram_duration_target(
Duration1, VQ5),
publish_fetch_and_ack(Churn, Len, VQ6)
@@ -2823,11 +2826,16 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
check_variable_queue_status(VQ0, Props) ->
VQ1 = variable_queue_wait_for_shuffling_end(VQ0),
- S = rabbit_variable_queue:status(VQ1),
+ S = variable_queue_status(VQ1),
io:format("~p~n", [S]),
assert_props(S, Props),
VQ1.
+variable_queue_status(VQ) ->
+ Keys = rabbit_backing_queue:info_keys() -- [backing_queue_status],
+ [{K, rabbit_variable_queue:info(K, VQ)} || K <- Keys] ++
+ rabbit_variable_queue:info(backing_queue_status, VQ).
+
variable_queue_wait_for_shuffling_end(VQ) ->
case credit_flow:blocked() of
false -> VQ;
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d6225c77..37bb4ff8 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -853,6 +853,7 @@ info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
PersistentBytes;
info(backing_queue_status, #vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
+ len = Len,
target_ram_count = TargetRamCount,
next_seq_id = NextSeqId,
rates = #rates { in = AvgIngressRate,
@@ -865,6 +866,7 @@ info(backing_queue_status, #vqstate {
{delta , Delta},
{q3 , ?QUEUE:len(Q3)},
{q4 , ?QUEUE:len(Q4)},
+ {len , Len},
{target_ram_count , TargetRamCount},
{next_seq_id , NextSeqId},
{avg_ingress_rate , AvgIngressRate},