summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-22 13:32:14 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-22 13:32:14 +0000
commit26542563459bfd6ca5a3f19e9a04d28f365ea0af (patch)
tree24d3cc0a352e1ae870dae3c93cb10c41ce8f1836
parent3bcddf991feb35f1e2a7f64a5cad6f4216ce72f2 (diff)
downloadrabbitmq-server-26542563459bfd6ca5a3f19e9a04d28f365ea0af.tar.gz
bq api tweak: don't include remaining message count in fetch/drop result
-rw-r--r--src/rabbit_amqqueue_process.erl11
-rw-r--r--src/rabbit_backing_queue.erl8
-rw-r--r--src/rabbit_mirror_queue_master.erl31
-rw-r--r--src/rabbit_mirror_queue_slave.erl3
-rw-r--r--src/rabbit_tests.erl23
-rw-r--r--src/rabbit_variable_queue.erl13
6 files changed, 42 insertions, 47 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index dc258fa6..fe3ed88d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -485,11 +485,10 @@ deliver_msg_to_consumer(DeliverFun,
{Stop, State1}.
deliver_from_queue_deliver(AckRequired, State) ->
- {{Message, IsDelivered, AckTag, _Remaining}, State1} =
- fetch(AckRequired, State),
+ {Result, State1} = fetch(AckRequired, State),
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
drop_expired_messages(State1),
- {{Message, IsDelivered, AckTag}, BQ:is_empty(BQS), State2}.
+ {Result, BQ:is_empty(BQS), State2}.
confirm_messages([], State) ->
State;
@@ -1061,8 +1060,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
case fetch(AckRequired, drop_expired_messages(State1)) of
{empty, State2} ->
reply(empty, State2);
- {{Message, IsDelivered, AckTag, Remaining}, State2} ->
- State3 =
+ {{Message, IsDelivered, AckTag}, State2} ->
+ State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
ChAckTags1 = sets:add_element(AckTag, ChAckTags),
@@ -1071,7 +1070,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
false -> State2
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, Remaining, Msg}, State3)
+ reply({ok, BQ:len(BQS), Msg}, State3)
end;
handle_call({basic_consume, NoAck, ChPid, Limiter,
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 00de3e17..871becc5 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -26,13 +26,9 @@
-type(msg_ids() :: [rabbit_types:msg_id()]).
-type(fetch_result(Ack) ::
- ('empty' |
- %% Message, IsDelivered, AckTag, Remaining_Len
- {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
+ ('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
-type(drop_result(Ack) ::
- ('empty' |
- %% MessageId, AckTag, Remaining_Len
- {rabbit_types:msg_id(), Ack, non_neg_integer()})).
+ ('empty' | {rabbit_types:msg_id(), Ack})).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
-type(async_callback() ::
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 961636b1..ac2048b7 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -268,8 +268,7 @@ drain_confirmed(State = #state { backing_queue = BQ,
seen_status = SS1,
confirmed = [] }}.
-fetch(AckRequired, State = #state { gm = GM,
- backing_queue = BQ,
+fetch(AckRequired, State = #state { backing_queue = BQ,
backing_queue_state = BQS,
set_delivered = SetDelivered }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
@@ -277,25 +276,19 @@ fetch(AckRequired, State = #state { gm = GM,
case Result of
empty ->
{Result, State1};
- {Message, IsDelivered, AckTag, Remaining} ->
- ok = gm:broadcast(GM, {drop, Remaining, 1, AckRequired}),
- IsDelivered1 = IsDelivered orelse SetDelivered > 0,
- {{Message, IsDelivered1, AckTag, Remaining},
+ {Message, IsDelivered, AckTag} ->
+ {{Message, IsDelivered orelse SetDelivered > 0, AckTag},
drop(Message#basic_message.id, AckTag, State1)}
end.
-drop(AckRequired, State = #state { gm = GM,
- backing_queue = BQ,
+drop(AckRequired, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
{Result, BQS1} = BQ:drop(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
- case Result of
- empty ->
- {Result, State1};
- {MsgId, AckTag, Remaining} ->
- ok = gm:broadcast(GM, {drop, Remaining, 1, AckRequired}),
- {Result, drop(MsgId, AckTag, State1)}
- end.
+ {Result, case Result of
+ empty -> State1;
+ {MsgId, AckTag} -> drop(MsgId, AckTag, State1)
+ end}.
ack(AckTags, State = #state { gm = GM,
backing_queue = BQ,
@@ -453,8 +446,12 @@ depth_fun() ->
%% Helpers
%% ---------------------------------------------------------------------------
-drop(MsgId, AckTag, State = #state { set_delivered = SetDelivered,
- ack_msg_id = AM }) ->
+drop(MsgId, AckTag, State = #state { set_delivered = SetDelivered,
+ ack_msg_id = AM,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}),
State #state { set_delivered = lists:max([0, SetDelivered - 1]),
ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 3ad8eb77..cb7a2135 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -727,8 +727,7 @@ process_instruction({drop, Length, Dropped, AckRequired},
end,
State1 = lists:foldl(
fun (const, StateN = #state{backing_queue_state = BQSN}) ->
- {{MsgId, AckTag, _Remaining}, BQSN1} =
- BQ:drop(AckRequired, BQSN),
+ {{MsgId, AckTag}, BQSN1} = BQ:drop(AckRequired, BQSN),
maybe_store_ack(
AckRequired, MsgId, AckTag,
StateN #state { backing_queue_state = BQSN1 })
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 444c7375..408bacd8 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2233,8 +2233,9 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
Rem = Len - N,
{{#basic_message { is_persistent = IsPersistent },
- IsDelivered, AckTagN, Rem}, VQM} =
+ IsDelivered, AckTagN}, VQM} =
rabbit_variable_queue:fetch(true, VQN),
+ Rem = rabbit_variable_queue:len(VQM),
{VQM, [AckTagN | AckTagsAcc]}
end, {VQ, []}, lists:seq(1, Count)).
@@ -2326,7 +2327,7 @@ test_variable_queue_requeue(VQ0) ->
VQM
end, VQ4, Subset),
VQ6 = lists:foldl(fun (AckTag, VQa) ->
- {{#basic_message{}, true, AckTag, _}, VQb} =
+ {{#basic_message{}, true, AckTag}, VQb} =
rabbit_variable_queue:fetch(true, VQa),
VQb
end, VQ5, lists:reverse(Acks)),
@@ -2366,14 +2367,16 @@ test_drop(VQ0) ->
%% start by sending a messages
VQ1 = variable_queue_publish(false, 1, VQ0),
%% drop message with AckRequired = true
- {{MsgId, AckTag, 0}, VQ2} = rabbit_variable_queue:drop(true, VQ1),
+ {{MsgId, AckTag}, VQ2} = rabbit_variable_queue:drop(true, VQ1),
+ true = rabbit_variable_queue:is_empty(VQ2),
true = AckTag =/= undefinded,
%% drop again -> empty
{empty, VQ3} = rabbit_variable_queue:drop(false, VQ2),
%% requeue
{[MsgId], VQ4} = rabbit_variable_queue:requeue([AckTag], VQ3),
%% drop message with AckRequired = false
- {{MsgId, undefined, 0}, VQ5} = rabbit_variable_queue:drop(false, VQ4),
+ {{MsgId, undefined}, VQ5} = rabbit_variable_queue:drop(false, VQ4),
+ true = rabbit_variable_queue:is_empty(VQ5),
VQ5.
test_dropwhile(VQ0) ->
@@ -2392,7 +2395,7 @@ test_dropwhile(VQ0) ->
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
- {{#basic_message{}, _, _, _}, VQM} =
+ {{#basic_message{}, _, _}, VQM} =
rabbit_variable_queue:fetch(false, VQN),
VQM
end, VQ2, lists:seq(6, Count)),
@@ -2445,7 +2448,8 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
VQ0;
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
- {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
+ {{_Msg, false, AckTag}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
+ Len = rabbit_variable_queue:len(VQ2),
{_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
publish_fetch_and_ack(N-1, Len, VQ3).
@@ -2510,8 +2514,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5),
VQ7 = variable_queue_init(test_amqqueue(true), true),
- {{_Msg1, true, _AckTag1, Count1}, VQ8} =
- rabbit_variable_queue:fetch(true, VQ7),
+ {{_Msg1, true, _AckTag1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7),
+ Count1 = rabbit_variable_queue:len(VQ8),
VQ9 = variable_queue_publish(false, 1, VQ8),
VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9),
{VQ11, _AckTags2} = variable_queue_fetch(Count1, true, true, Count, VQ10),
@@ -2551,8 +2555,9 @@ test_queue_recover() ->
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
VQ1 = variable_queue_init(Q, true),
- {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
+ {{_Msg1, true, _AckTag1}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
+ CountMinusOne = rabbit_variable_queue:len(VQ2),
_VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
rabbit_amqqueue:internal_delete(QName, QPid1)
end),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 208eb70f..3a025ba3 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -591,8 +591,8 @@ dropwhile(Pred, AckRequired, State, Msgs) ->
case {Pred(MsgProps), AckRequired} of
{true, true} ->
{MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {{Msg, _, AckTag, _}, State3} =
- internal_fetch(true, MsgStatus1, State2),
+ {{Msg, _IsDelivered, AckTag}, State3} =
+ internal_fetch(true, MsgStatus1, State2),
dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]);
{true, false} ->
{_, State2} = internal_fetch(false, MsgStatus, State1),
@@ -619,9 +619,9 @@ drop(AckRequired, State) ->
{empty, State1} ->
{empty, a(State1)};
{{value, MsgStatus}, State1} ->
- {{_Msg, _IsDelivered, AckTag, Remaining}, State2} =
+ {{_Msg, _IsDelivered, AckTag}, State2} =
internal_fetch(AckRequired, MsgStatus, State1),
- {{MsgStatus#msg_status.msg_id, AckTag, Remaining}, a(State2)}
+ {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)}
end.
ack([], State) ->
@@ -1125,14 +1125,13 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
end,
PCount1 = PCount - one_if(IsPersistent andalso not AckRequired),
- Len1 = Len - 1,
RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined),
- {{Msg, IsDelivered, AckTag, Len1},
+ {{Msg, IsDelivered, AckTag},
State1 #vqstate { ram_msg_count = RamMsgCount1,
out_counter = OutCount + 1,
index_state = IndexState2,
- len = Len1,
+ len = Len - 1,
persistent_count = PCount1 }}.
purge_betas_and_deltas(LensByStore,