diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-22 13:32:14 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-22 13:32:14 +0000 |
commit | 26542563459bfd6ca5a3f19e9a04d28f365ea0af (patch) | |
tree | 24d3cc0a352e1ae870dae3c93cb10c41ce8f1836 | |
parent | 3bcddf991feb35f1e2a7f64a5cad6f4216ce72f2 (diff) | |
download | rabbitmq-server-26542563459bfd6ca5a3f19e9a04d28f365ea0af.tar.gz |
bq api tweak: don't include remaining message count in fetch/drop result
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 11 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 8 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 31 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 3 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 23 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 13 |
6 files changed, 42 insertions, 47 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index dc258fa6..fe3ed88d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -485,11 +485,10 @@ deliver_msg_to_consumer(DeliverFun, {Stop, State1}. deliver_from_queue_deliver(AckRequired, State) -> - {{Message, IsDelivered, AckTag, _Remaining}, State1} = - fetch(AckRequired, State), + {Result, State1} = fetch(AckRequired, State), State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State1), - {{Message, IsDelivered, AckTag}, BQ:is_empty(BQS), State2}. + {Result, BQ:is_empty(BQS), State2}. confirm_messages([], State) -> State; @@ -1061,8 +1060,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, case fetch(AckRequired, drop_expired_messages(State1)) of {empty, State2} -> reply(empty, State2); - {{Message, IsDelivered, AckTag, Remaining}, State2} -> - State3 = + {{Message, IsDelivered, AckTag}, State2} -> + State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), ChAckTags1 = sets:add_element(AckTag, ChAckTags), @@ -1071,7 +1070,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, false -> State2 end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, Remaining, Msg}, State3) + reply({ok, BQ:len(BQS), Msg}, State3) end; handle_call({basic_consume, NoAck, ChPid, Limiter, diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 00de3e17..871becc5 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -26,13 +26,9 @@ -type(msg_ids() :: [rabbit_types:msg_id()]). -type(fetch_result(Ack) :: - ('empty' | - %% Message, IsDelivered, AckTag, Remaining_Len - {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})). + ('empty' | {rabbit_types:basic_message(), boolean(), Ack})). -type(drop_result(Ack) :: - ('empty' | - %% MessageId, AckTag, Remaining_Len - {rabbit_types:msg_id(), Ack, non_neg_integer()})). + ('empty' | {rabbit_types:msg_id(), Ack})). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). -type(async_callback() :: diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 961636b1..ac2048b7 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -268,8 +268,7 @@ drain_confirmed(State = #state { backing_queue = BQ, seen_status = SS1, confirmed = [] }}. -fetch(AckRequired, State = #state { gm = GM, - backing_queue = BQ, +fetch(AckRequired, State = #state { backing_queue = BQ, backing_queue_state = BQS, set_delivered = SetDelivered }) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), @@ -277,25 +276,19 @@ fetch(AckRequired, State = #state { gm = GM, case Result of empty -> {Result, State1}; - {Message, IsDelivered, AckTag, Remaining} -> - ok = gm:broadcast(GM, {drop, Remaining, 1, AckRequired}), - IsDelivered1 = IsDelivered orelse SetDelivered > 0, - {{Message, IsDelivered1, AckTag, Remaining}, + {Message, IsDelivered, AckTag} -> + {{Message, IsDelivered orelse SetDelivered > 0, AckTag}, drop(Message#basic_message.id, AckTag, State1)} end. -drop(AckRequired, State = #state { gm = GM, - backing_queue = BQ, +drop(AckRequired, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> {Result, BQS1} = BQ:drop(AckRequired, BQS), State1 = State #state { backing_queue_state = BQS1 }, - case Result of - empty -> - {Result, State1}; - {MsgId, AckTag, Remaining} -> - ok = gm:broadcast(GM, {drop, Remaining, 1, AckRequired}), - {Result, drop(MsgId, AckTag, State1)} - end. + {Result, case Result of + empty -> State1; + {MsgId, AckTag} -> drop(MsgId, AckTag, State1) + end}. ack(AckTags, State = #state { gm = GM, backing_queue = BQ, @@ -453,8 +446,12 @@ depth_fun() -> %% Helpers %% --------------------------------------------------------------------------- -drop(MsgId, AckTag, State = #state { set_delivered = SetDelivered, - ack_msg_id = AM }) -> +drop(MsgId, AckTag, State = #state { set_delivered = SetDelivered, + ack_msg_id = AM, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}), State #state { set_delivered = lists:max([0, SetDelivered - 1]), ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 3ad8eb77..cb7a2135 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -727,8 +727,7 @@ process_instruction({drop, Length, Dropped, AckRequired}, end, State1 = lists:foldl( fun (const, StateN = #state{backing_queue_state = BQSN}) -> - {{MsgId, AckTag, _Remaining}, BQSN1} = - BQ:drop(AckRequired, BQSN), + {{MsgId, AckTag}, BQSN1} = BQ:drop(AckRequired, BQSN), maybe_store_ack( AckRequired, MsgId, AckTag, StateN #state { backing_queue_state = BQSN1 }) diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 444c7375..408bacd8 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2233,8 +2233,9 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> lists:foldl(fun (N, {VQN, AckTagsAcc}) -> Rem = Len - N, {{#basic_message { is_persistent = IsPersistent }, - IsDelivered, AckTagN, Rem}, VQM} = + IsDelivered, AckTagN}, VQM} = rabbit_variable_queue:fetch(true, VQN), + Rem = rabbit_variable_queue:len(VQM), {VQM, [AckTagN | AckTagsAcc]} end, {VQ, []}, lists:seq(1, Count)). @@ -2326,7 +2327,7 @@ test_variable_queue_requeue(VQ0) -> VQM end, VQ4, Subset), VQ6 = lists:foldl(fun (AckTag, VQa) -> - {{#basic_message{}, true, AckTag, _}, VQb} = + {{#basic_message{}, true, AckTag}, VQb} = rabbit_variable_queue:fetch(true, VQa), VQb end, VQ5, lists:reverse(Acks)), @@ -2366,14 +2367,16 @@ test_drop(VQ0) -> %% start by sending a messages VQ1 = variable_queue_publish(false, 1, VQ0), %% drop message with AckRequired = true - {{MsgId, AckTag, 0}, VQ2} = rabbit_variable_queue:drop(true, VQ1), + {{MsgId, AckTag}, VQ2} = rabbit_variable_queue:drop(true, VQ1), + true = rabbit_variable_queue:is_empty(VQ2), true = AckTag =/= undefinded, %% drop again -> empty {empty, VQ3} = rabbit_variable_queue:drop(false, VQ2), %% requeue {[MsgId], VQ4} = rabbit_variable_queue:requeue([AckTag], VQ3), %% drop message with AckRequired = false - {{MsgId, undefined, 0}, VQ5} = rabbit_variable_queue:drop(false, VQ4), + {{MsgId, undefined}, VQ5} = rabbit_variable_queue:drop(false, VQ4), + true = rabbit_variable_queue:is_empty(VQ5), VQ5. test_dropwhile(VQ0) -> @@ -2392,7 +2395,7 @@ test_dropwhile(VQ0) -> %% fetch five now VQ3 = lists:foldl(fun (_N, VQN) -> - {{#basic_message{}, _, _, _}, VQM} = + {{#basic_message{}, _, _}, VQM} = rabbit_variable_queue:fetch(false, VQN), VQM end, VQ2, lists:seq(6, Count)), @@ -2445,7 +2448,8 @@ publish_fetch_and_ack(0, _Len, VQ0) -> VQ0; publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), - {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), + {{_Msg, false, AckTag}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), + Len = rabbit_variable_queue:len(VQ2), {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), publish_fetch_and_ack(N-1, Len, VQ3). @@ -2510,8 +2514,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(shutdown, VQ5), VQ7 = variable_queue_init(test_amqqueue(true), true), - {{_Msg1, true, _AckTag1, Count1}, VQ8} = - rabbit_variable_queue:fetch(true, VQ7), + {{_Msg1, true, _AckTag1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), + Count1 = rabbit_variable_queue:len(VQ8), VQ9 = variable_queue_publish(false, 1, VQ8), VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9), {VQ11, _AckTags2} = variable_queue_fetch(Count1, true, true, Count, VQ10), @@ -2551,8 +2555,9 @@ test_queue_recover() -> rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), VQ1 = variable_queue_init(Q, true), - {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = + {{_Msg1, true, _AckTag1}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), + CountMinusOne = rabbit_variable_queue:len(VQ2), _VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2), rabbit_amqqueue:internal_delete(QName, QPid1) end), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 208eb70f..3a025ba3 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -591,8 +591,8 @@ dropwhile(Pred, AckRequired, State, Msgs) -> case {Pred(MsgProps), AckRequired} of {true, true} -> {MsgStatus1, State2} = read_msg(MsgStatus, State1), - {{Msg, _, AckTag, _}, State3} = - internal_fetch(true, MsgStatus1, State2), + {{Msg, _IsDelivered, AckTag}, State3} = + internal_fetch(true, MsgStatus1, State2), dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]); {true, false} -> {_, State2} = internal_fetch(false, MsgStatus, State1), @@ -619,9 +619,9 @@ drop(AckRequired, State) -> {empty, State1} -> {empty, a(State1)}; {{value, MsgStatus}, State1} -> - {{_Msg, _IsDelivered, AckTag, Remaining}, State2} = + {{_Msg, _IsDelivered, AckTag}, State2} = internal_fetch(AckRequired, MsgStatus, State1), - {{MsgStatus#msg_status.msg_id, AckTag, Remaining}, a(State2)} + {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)} end. ack([], State) -> @@ -1125,14 +1125,13 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { end, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), - Len1 = Len - 1, RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - {{Msg, IsDelivered, AckTag, Len1}, + {{Msg, IsDelivered, AckTag}, State1 #vqstate { ram_msg_count = RamMsgCount1, out_counter = OutCount + 1, index_state = IndexState2, - len = Len1, + len = Len - 1, persistent_count = PCount1 }}. purge_betas_and_deltas(LensByStore, |