summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-01 19:55:38 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-01 19:55:38 +0000
commit0c603bc18f3276b7e1a72712f63f32aeb6de35e0 (patch)
tree2781b20ff33ae1b4e876e400b15f5575692fbd83
parent75d9df8f53a5dfe92386b65ac21e7c1789532ed2 (diff)
downloadrabbitmq-server-bug25372.tar.gz
drop IsDelivered from bq:{fetchwhile,ackfold}bug25372
since we don't need it
-rw-r--r--src/rabbit_amqqueue_process.erl9
-rw-r--r--src/rabbit_backing_queue.erl14
-rw-r--r--src/rabbit_tests.erl6
-rw-r--r--src/rabbit_variable_queue.erl11
4 files changed, 18 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2ee5122c..b5ad1ac0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -720,7 +720,7 @@ drop_expired_messages(State = #q{dlx = DLX,
undefined -> BQ:dropwhile(ExpirePred, BQS);
_ -> {Next, Msgs, BQS2} =
BQ:fetchwhile(ExpirePred,
- fun accumulate_msgs/4,
+ fun accumulate_msgs/3,
[], BQS),
case Msgs of
[] -> ok;
@@ -734,7 +734,7 @@ drop_expired_messages(State = #q{dlx = DLX,
#message_properties{expiry = Exp} -> Exp
end, State#q{backing_queue_state = BQS1}).
-accumulate_msgs(Msg, _IsDelivered, AckTag, Acc) -> [{Msg, AckTag} | Acc].
+accumulate_msgs(Msg, AckTag, Acc) -> [{Msg, AckTag} | Acc].
ensure_ttl_timer(undefined, State) ->
State;
@@ -1203,9 +1203,8 @@ handle_cast({reject, AckTags, false, ChPid}, State) ->
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{ok, BQS1} = BQ:ackfold(
- fun (Msg, _IsDelivered, AckTag, ok) ->
- DLXFun([{Msg, AckTag}])
- end, ok, BQS, AckTags),
+ fun (M, A, ok) -> DLXFun([{M, A}]) end,
+ ok, BQS, AckTags),
State1#q{backing_queue_state = BQS1}
end));
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index da7ff10d..99b5946e 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -35,8 +35,7 @@
fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(duration() :: ('undefined' | 'infinity' | number())).
--type(msg_fun(A) :: fun ((rabbit_types:basic_message(), boolean(), ack(), A)
- -> A)).
+-type(msg_fun(A) :: fun ((rabbit_types:basic_message(), ack(), A) -> A)).
-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
%% Called on startup with a list of durable queue names. The queues
@@ -133,10 +132,10 @@
-> {rabbit_types:message_properties() | undefined, state()}.
%% Like dropwhile, except messages are fetched in "require
-%% acknowledgement" mode and are passed, together with their Delivered
-%% flag and ack tag, to the supplied function. The function is also
-%% fed an accumulator. The result of fetchwhile is as for dropwhile
-%% plus the accumulator.
+%% acknowledgement" mode and are passed, together with their ack tag,
+%% to the supplied function. The function is also fed an
+%% accumulator. The result of fetchwhile is as for dropwhile plus the
+%% accumulator.
-callback fetchwhile(msg_pred(), msg_fun(A), A, state())
-> {rabbit_types:message_properties() | undefined,
A, state()}.
@@ -158,8 +157,7 @@
-callback requeue([ack()], state()) -> {msg_ids(), state()}.
%% Fold over messages by ack tag. The supplied function is called with
-%% each message, its IsDelivered flag, its ack tag, and an
-%% accumulator.
+%% each message, its ack tag, and an accumulator.
-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}.
%% Fold over all the messages in a queue and return the accumulated
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 30606fdb..09ed3d08 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2434,7 +2434,7 @@ test_dropfetchwhile(VQ0) ->
{#message_properties{expiry = 6}, {Msgs, AckTags}, VQ2} =
rabbit_variable_queue:fetchwhile(
fun (#message_properties{expiry = Expiry}) -> Expiry =< 5 end,
- fun (Msg, _Delivered, AckTag, {MsgAcc, AckAcc}) ->
+ fun (Msg, AckTag, {MsgAcc, AckAcc}) ->
{[Msg | MsgAcc], [AckTag | AckAcc]}
end, {[], []}, VQ1),
true = lists:seq(1, 5) == [msg2int(M) || M <- lists:reverse(Msgs)],
@@ -2473,7 +2473,7 @@ test_fetchwhile_varying_ram_duration(VQ0) ->
fun (VQ1) ->
{_, ok, VQ2} = rabbit_variable_queue:fetchwhile(
fun (_) -> false end,
- fun (_, _, _, A) -> A end,
+ fun (_, _, A) -> A end,
ok, VQ1),
VQ2
end, VQ0).
@@ -2608,7 +2608,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
test_variable_queue_fold_msg_on_disk(VQ0) ->
VQ1 = variable_queue_publish(true, 1, VQ0),
{VQ2, AckTags} = variable_queue_fetch(1, true, false, 1, VQ1),
- {ok, VQ3} = rabbit_variable_queue:ackfold(fun (_M, _D, _A, ok) -> ok end,
+ {ok, VQ3} = rabbit_variable_queue:ackfold(fun (_M, _A, ok) -> ok end,
ok, VQ2, AckTags),
VQ3.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ce43200d..05468a6e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -597,10 +597,9 @@ fetchwhile(Pred, Fun, Acc, State) ->
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case Pred(MsgProps) of
true -> {MsgStatus1, State2} = read_msg(MsgStatus, State1),
- {{Msg, IsDelivered, AckTag}, State3} =
+ {{Msg, _IsDelivered, AckTag}, State3} =
internal_fetch(true, MsgStatus1, State2),
- Acc1 = Fun(Msg, IsDelivered, AckTag, Acc),
- fetchwhile(Pred, Fun, Acc1, State3);
+ fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3);
false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))}
end
end.
@@ -675,9 +674,9 @@ ackfold(MsgFun, Acc, State, AckTags) ->
{AccN, StateN} =
lists:foldl(
fun(SeqId, {Acc0, State0 = #vqstate{ pending_ack = PA }}) ->
- {#msg_status { msg = Msg, is_delivered = IsDelivered },
- State1 } = read_msg(gb_trees:get(SeqId, PA), false, State0),
- {MsgFun(Msg, IsDelivered, SeqId, Acc0), State1}
+ {#msg_status { msg = Msg }, State1} =
+ read_msg(gb_trees:get(SeqId, PA), false, State0),
+ {MsgFun(Msg, SeqId, Acc0), State1}
end, {Acc, State}, AckTags),
{AccN, a(StateN)}.