diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-18 00:31:00 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-18 00:31:00 +0100 |
commit | 0699519a7db14dfb5d8acd43809f147a472f0e5f (patch) | |
tree | 8f82a65ca6cfa18d89500e36f2baef7394d69e01 | |
parent | 18555077d5e14138c9568691367f0277b5ab0f79 (diff) | |
download | rabbitmq-server-0699519a7db14dfb5d8acd43809f147a472f0e5f.tar.gz |
extract ack handling logic
in common between 'ack', 'reject' and 'requeue'
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 67 |
1 files changed, 33 insertions, 34 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b66109e3..da3c51bd 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -687,8 +687,16 @@ run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}). -subtract_acks(A, B) when is_list(B) -> - lists:foldl(fun sets:del_element/2, A, B). +subtract_acks(ChPid, AckTags, State, Fun) -> + case lookup_ch(ChPid) of + not_found -> + State; + C = #cr{acktags = ChAckTags} -> + update_ch_record( + C#cr{acktags = lists:foldl(fun sets:del_element/2, + ChAckTags, AckTags)}), + Fun(State) + end. discard_delivery(#delivery{sender = ChPid, message = Message}, @@ -1051,13 +1059,9 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), - case lookup_ch(ChPid) of - not_found -> - noreply(State); - C = #cr{acktags = ChAckTags} -> - update_ch_record(C#cr{acktags = subtract_acks(ChAckTags, AckTags)}), - noreply(requeue_and_run(AckTags, State)) - end. + noreply(subtract_acks( + ChPid, AckTags, State, + fun (State1) -> requeue_and_run(AckTags, State1) end)). handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -1066,31 +1070,26 @@ handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. noreply(deliver_or_enqueue(Delivery, State)); -handle_cast({ack, AckTags, ChPid}, - State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - case lookup_ch(ChPid) of - not_found -> - noreply(State); - C = #cr{acktags = ChAckTags} -> - update_ch_record(C#cr{acktags = subtract_acks(ChAckTags, AckTags)}), - {_Guids, BQS1} = BQ:ack(AckTags, BQS), - noreply(State#q{backing_queue_state = BQS1}) - end; - -handle_cast({reject, AckTags, Requeue, ChPid}, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - case lookup_ch(ChPid) of - not_found -> - noreply(State); - C = #cr{acktags = ChAckTags} -> - update_ch_record(C#cr{acktags = subtract_acks(ChAckTags, AckTags)}), - noreply(case Requeue of - true -> requeue_and_run(AckTags, State); - false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), - State#q{backing_queue_state = BQS1} - end) - end; +handle_cast({ack, AckTags, ChPid}, State) -> + noreply(subtract_acks( + ChPid, AckTags, State, + fun (State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State1#q{backing_queue_state = BQS1} + end)); + +handle_cast({reject, AckTags, Requeue, ChPid}, State) -> + noreply(subtract_acks( + ChPid, AckTags, State, + fun (State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + case Requeue of + true -> requeue_and_run(AckTags, State1); + false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State1#q{backing_queue_state = BQS1} + end + end)); handle_cast(delete_immediately, State) -> {stop, normal, State}; |