summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-09-18 00:31:00 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-09-18 00:31:00 +0100
commit0699519a7db14dfb5d8acd43809f147a472f0e5f (patch)
tree8f82a65ca6cfa18d89500e36f2baef7394d69e01
parent18555077d5e14138c9568691367f0277b5ab0f79 (diff)
downloadrabbitmq-server-0699519a7db14dfb5d8acd43809f147a472f0e5f.tar.gz
extract ack handling logic
in common between 'ack', 'reject' and 'requeue'
-rw-r--r--src/rabbit_amqqueue_process.erl67
1 files changed, 33 insertions, 34 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b66109e3..da3c51bd 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -687,8 +687,16 @@ run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}).
-subtract_acks(A, B) when is_list(B) ->
- lists:foldl(fun sets:del_element/2, A, B).
+subtract_acks(ChPid, AckTags, State, Fun) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ State;
+ C = #cr{acktags = ChAckTags} ->
+ update_ch_record(
+ C#cr{acktags = lists:foldl(fun sets:del_element/2,
+ ChAckTags, AckTags)}),
+ Fun(State)
+ end.
discard_delivery(#delivery{sender = ChPid,
message = Message},
@@ -1051,13 +1059,9 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
- case lookup_ch(ChPid) of
- not_found ->
- noreply(State);
- C = #cr{acktags = ChAckTags} ->
- update_ch_record(C#cr{acktags = subtract_acks(ChAckTags, AckTags)}),
- noreply(requeue_and_run(AckTags, State))
- end.
+ noreply(subtract_acks(
+ ChPid, AckTags, State,
+ fun (State1) -> requeue_and_run(AckTags, State1) end)).
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
@@ -1066,31 +1070,26 @@ handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
noreply(deliver_or_enqueue(Delivery, State));
-handle_cast({ack, AckTags, ChPid},
- State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- case lookup_ch(ChPid) of
- not_found ->
- noreply(State);
- C = #cr{acktags = ChAckTags} ->
- update_ch_record(C#cr{acktags = subtract_acks(ChAckTags, AckTags)}),
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
- noreply(State#q{backing_queue_state = BQS1})
- end;
-
-handle_cast({reject, AckTags, Requeue, ChPid},
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- case lookup_ch(ChPid) of
- not_found ->
- noreply(State);
- C = #cr{acktags = ChAckTags} ->
- update_ch_record(C#cr{acktags = subtract_acks(ChAckTags, AckTags)}),
- noreply(case Requeue of
- true -> requeue_and_run(AckTags, State);
- false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS),
- State#q{backing_queue_state = BQS1}
- end)
- end;
+handle_cast({ack, AckTags, ChPid}, State) ->
+ noreply(subtract_acks(
+ ChPid, AckTags, State,
+ fun (State1 = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State1#q{backing_queue_state = BQS1}
+ end));
+
+handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
+ noreply(subtract_acks(
+ ChPid, AckTags, State,
+ fun (State1 = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ case Requeue of
+ true -> requeue_and_run(AckTags, State1);
+ false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State1#q{backing_queue_state = BQS1}
+ end
+ end));
handle_cast(delete_immediately, State) ->
{stop, normal, State};