diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-01 19:55:38 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-01 19:55:38 +0000 |
commit | 0c603bc18f3276b7e1a72712f63f32aeb6de35e0 (patch) | |
tree | 2781b20ff33ae1b4e876e400b15f5575692fbd83 | |
parent | 75d9df8f53a5dfe92386b65ac21e7c1789532ed2 (diff) | |
download | rabbitmq-server-bug25372.tar.gz |
drop IsDelivered from bq:{fetchwhile,ackfold}bug25372
since we don't need it
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 9 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 14 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 6 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 11 |
4 files changed, 18 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2ee5122c..b5ad1ac0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -720,7 +720,7 @@ drop_expired_messages(State = #q{dlx = DLX, undefined -> BQ:dropwhile(ExpirePred, BQS); _ -> {Next, Msgs, BQS2} = BQ:fetchwhile(ExpirePred, - fun accumulate_msgs/4, + fun accumulate_msgs/3, [], BQS), case Msgs of [] -> ok; @@ -734,7 +734,7 @@ drop_expired_messages(State = #q{dlx = DLX, #message_properties{expiry = Exp} -> Exp end, State#q{backing_queue_state = BQS1}). -accumulate_msgs(Msg, _IsDelivered, AckTag, Acc) -> [{Msg, AckTag} | Acc]. +accumulate_msgs(Msg, AckTag, Acc) -> [{Msg, AckTag} | Acc]. ensure_ttl_timer(undefined, State) -> State; @@ -1203,9 +1203,8 @@ handle_cast({reject, AckTags, false, ChPid}, State) -> fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {ok, BQS1} = BQ:ackfold( - fun (Msg, _IsDelivered, AckTag, ok) -> - DLXFun([{Msg, AckTag}]) - end, ok, BQS, AckTags), + fun (M, A, ok) -> DLXFun([{M, A}]) end, + ok, BQS, AckTags), State1#q{backing_queue_state = BQS1} end)); diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index da7ff10d..99b5946e 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -35,8 +35,7 @@ fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). -type(duration() :: ('undefined' | 'infinity' | number())). --type(msg_fun(A) :: fun ((rabbit_types:basic_message(), boolean(), ack(), A) - -> A)). +-type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)). -type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())). %% Called on startup with a list of durable queue names. The queues @@ -133,10 +132,10 @@ -> {rabbit_types:message_properties() | undefined, state()}. %% Like dropwhile, except messages are fetched in "require -%% acknowledgement" mode and are passed, together with their Delivered -%% flag and ack tag, to the supplied function. The function is also -%% fed an accumulator. The result of fetchwhile is as for dropwhile -%% plus the accumulator. +%% acknowledgement" mode and are passed, together with their ack tag, +%% to the supplied function. The function is also fed an +%% accumulator. The result of fetchwhile is as for dropwhile plus the +%% accumulator. -callback fetchwhile(msg_pred(), msg_fun(A), A, state()) -> {rabbit_types:message_properties() | undefined, A, state()}. @@ -158,8 +157,7 @@ -callback requeue([ack()], state()) -> {msg_ids(), state()}. %% Fold over messages by ack tag. The supplied function is called with -%% each message, its IsDelivered flag, its ack tag, and an -%% accumulator. +%% each message, its ack tag, and an accumulator. -callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}. %% Fold over all the messages in a queue and return the accumulated diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 30606fdb..09ed3d08 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2434,7 +2434,7 @@ test_dropfetchwhile(VQ0) -> {#message_properties{expiry = 6}, {Msgs, AckTags}, VQ2} = rabbit_variable_queue:fetchwhile( fun (#message_properties{expiry = Expiry}) -> Expiry =< 5 end, - fun (Msg, _Delivered, AckTag, {MsgAcc, AckAcc}) -> + fun (Msg, AckTag, {MsgAcc, AckAcc}) -> {[Msg | MsgAcc], [AckTag | AckAcc]} end, {[], []}, VQ1), true = lists:seq(1, 5) == [msg2int(M) || M <- lists:reverse(Msgs)], @@ -2473,7 +2473,7 @@ test_fetchwhile_varying_ram_duration(VQ0) -> fun (VQ1) -> {_, ok, VQ2} = rabbit_variable_queue:fetchwhile( fun (_) -> false end, - fun (_, _, _, A) -> A end, + fun (_, _, A) -> A end, ok, VQ1), VQ2 end, VQ0). @@ -2608,7 +2608,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> test_variable_queue_fold_msg_on_disk(VQ0) -> VQ1 = variable_queue_publish(true, 1, VQ0), {VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1), - {ok, VQ3} = rabbit_variable_queue:ackfold(fun (_M, _D, _A, ok) -> ok end, + {ok, VQ3} = rabbit_variable_queue:ackfold(fun (_M, _A, ok) -> ok end, ok, VQ2, AckTags), VQ3. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ce43200d..05468a6e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -597,10 +597,9 @@ fetchwhile(Pred, Fun, Acc, State) -> {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> case Pred(MsgProps) of true -> {MsgStatus1, State2} = read_msg(MsgStatus, State1), - {{Msg, IsDelivered, AckTag}, State3} = + {{Msg, _IsDelivered, AckTag}, State3} = internal_fetch(true, MsgStatus1, State2), - Acc1 = Fun(Msg, IsDelivered, AckTag, Acc), - fetchwhile(Pred, Fun, Acc1, State3); + fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3); false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))} end end. @@ -675,9 +674,9 @@ ackfold(MsgFun, Acc, State, AckTags) -> {AccN, StateN} = lists:foldl( fun(SeqId, {Acc0, State0 = #vqstate{ pending_ack = PA }}) -> - {#msg_status { msg = Msg, is_delivered = IsDelivered }, - State1 } = read_msg(gb_trees:get(SeqId, PA), false, State0), - {MsgFun(Msg, IsDelivered, SeqId, Acc0), State1} + {#msg_status { msg = Msg }, State1} = + read_msg(gb_trees:get(SeqId, PA), false, State0), + {MsgFun(Msg, SeqId, Acc0), State1} end, {Acc, State}, AckTags), {AccN, a(StateN)}. |