diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-07-14 11:48:37 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-07-14 11:48:37 +0100 |
commit | 57b78e60c441714d48f30a1adfaea7011a756eb1 (patch) | |
tree | 61d93a2aa8a87e5cf7d07777135b9e023a63c34f /src/rabbit_amqqueue_process.erl | |
parent | 6d6fb807e4a6b57dee8a72e650fa550d3ed23a4c (diff) | |
parent | cbcc208629db5bac9d26862a481aa079d8e89478 (diff) | |
download | rabbitmq-server-57b78e60c441714d48f30a1adfaea7011a756eb1.tar.gz |
Merging default to bug24130
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 127 |
1 files changed, 20 insertions, 107 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index cb8a485e..9fe13d0d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -62,7 +62,6 @@ monitor_ref, acktags, is_limit_active, - txn, unsent_message_count}). -define(STATISTICS_KEYS, @@ -193,14 +192,7 @@ bq_init(BQ, Q, Recover) -> Self = self(), BQ:init(Q, Recover, fun (Mod, Fun) -> - rabbit_amqqueue:run_backing_queue_async(Self, Mod, Fun) - end, - fun (Mod, Fun) -> - rabbit_misc:with_exit_handler( - fun () -> error end, - fun () -> - rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) - end) + rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) end). process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> @@ -217,22 +209,14 @@ init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}). terminate_shutdown(Fun, State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = + State1 = #q{backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), case BQS of undefined -> State1; _ -> ok = rabbit_memory_monitor:deregister(self()), - BQS1 = lists:foldl( - fun (#cr{txn = none}, BQSN) -> - BQSN; - (#cr{txn = Txn}, BQSN) -> - {_AckTags, BQSN1} = - BQ:tx_rollback(Txn, BQSN), - BQSN1 - end, BQS, all_ch_record()), [emit_consumer_deleted(Ch, CTag) || {Ch, CTag, _} <- consumers(State1)], - State1#q{backing_queue_state = Fun(BQS1)} + State1#q{backing_queue_state = Fun(BQS)} end. reply(Reply, NewState) -> @@ -343,7 +327,6 @@ ch_record(ChPid) -> monitor_ref = MonitorRef, acktags = sets:new(), is_limit_active = false, - txn = none, unsent_message_count = 0}, put(Key, C), C; @@ -355,13 +338,12 @@ store_ch_record(C = #cr{ch_pid = ChPid}) -> maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount, acktags = ChAckTags, - txn = Txn, unsent_message_count = UnsentMessageCount}) -> - case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount, Txn} of - {0, 0, 0, none} -> ok = erase_ch_record(C), - false; - _ -> store_ch_record(C), - true + case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of + {0, 0, 0} -> ok = erase_ch_record(C), + false; + _ -> store_ch_record(C), + true end. erase_ch_record(#cr{ch_pid = ChPid, @@ -513,8 +495,7 @@ run_message_queue(State) -> {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1), State2. -attempt_delivery(Delivery = #delivery{txn = none, - sender = ChPid, +attempt_delivery(Delivery = #delivery{sender = ChPid, message = Message, msg_seq_no = MsgSeqNo}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -523,7 +504,7 @@ attempt_delivery(Delivery = #delivery{txn = none, immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); _ -> ok end, - case BQ:is_duplicate(none, Message, BQS) of + case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = @@ -555,24 +536,6 @@ attempt_delivery(Delivery = #delivery{txn = none, discarded -> false end, {Delivered, Confirm, State#q{backing_queue_state = BQS1}} - end; -attempt_delivery(Delivery = #delivery{txn = Txn, - sender = ChPid, - message = Message}, - State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - Confirm = should_confirm_message(Delivery, State), - case BQ:is_duplicate(Txn, Message, BQS) of - {false, BQS1} -> - store_ch_record((ch_record(ChPid))#cr{txn = Txn}), - BQS2 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid, - BQS1), - {true, Confirm, State#q{backing_queue_state = BQS2}}; - {Duplicate, BQS1} -> - Delivered = case Duplicate of - published -> true; - discarded -> false - end, - {Delivered, Confirm, State#q{backing_queue_state = BQS1}} end. deliver_or_enqueue(Delivery = #delivery{message = Message, @@ -652,7 +615,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> case lookup_ch(DownPid) of not_found -> {ok, State}; - C = #cr{ch_pid = ChPid, txn = Txn, acktags = ChAckTags} -> + C = #cr{ch_pid = ChPid, acktags = ChAckTags} -> ok = erase_ch_record(C), State1 = State#q{ exclusive_consumer = case Holder of @@ -665,13 +628,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> ChPid, State#q.blocked_consumers)}, case should_auto_delete(State1) of true -> {stop, State1}; - false -> State2 = case Txn of - none -> State1; - _ -> rollback_transaction(Txn, C, - State1) - end, - {ok, requeue_and_run(sets:to_list(ChAckTags), - ensure_expiry_timer(State2))} + false -> {ok, requeue_and_run(sets:to_list(ChAckTags), + ensure_expiry_timer(State1))} end end. @@ -705,25 +663,6 @@ run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}). -commit_transaction(Txn, From, C = #cr{acktags = ChAckTags}, - State = #q{backing_queue = BQ, - backing_queue_state = BQS, - ttl = TTL}) -> - {AckTags, BQS1} = BQ:tx_commit( - Txn, fun () -> gen_server2:reply(From, ok) end, - reset_msg_expiry_fun(TTL), BQS), - ChAckTags1 = subtract_acks(ChAckTags, AckTags), - maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), - State#q{backing_queue_state = BQS1}. - -rollback_transaction(Txn, C, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {_AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), - %% Iff we removed acktags from the channel record on ack+txn then - %% we would add them back in here. - maybe_store_ch_record(C#cr{txn = none}), - State#q{backing_queue_state = BQS1}. - subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). @@ -860,7 +799,6 @@ prioritise_call(Msg, _From, _State) -> info -> 9; {info, _Items} -> 9; consumers -> 9; - {run_backing_queue, _Mod, _Fun} -> 6; _ -> 0 end. @@ -873,7 +811,7 @@ prioritise_cast(Msg, _State) -> maybe_expire -> 8; drop_expired -> 8; emit_stats -> 7; - {ack, _Txn, _AckTags, _ChPid} -> 7; + {ack, _AckTags, _ChPid} -> 7; {reject, _AckTags, _Requeue, _ChPid} -> 7; {notify_sent, _ChPid} -> 7; {unblock, _ChPid} -> 7; @@ -945,13 +883,6 @@ handle_call({deliver, Delivery}, From, State) -> gen_server2:reply(From, true), noreply(deliver_or_enqueue(Delivery, State)); -handle_call({commit, Txn, ChPid}, From, State) -> - case lookup_ch(ChPid) of - not_found -> reply(ok, State); - C -> noreply(run_message_queue( - commit_transaction(Txn, From, C, State))) - end; - handle_call({notify_down, ChPid}, _From, State) -> %% we want to do this synchronously, so that auto_deleted queues %% are no longer visible by the time we send a response to the @@ -1091,11 +1022,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(requeue_and_run(AckTags, State)) - end; - -handle_call({run_backing_queue, Mod, Fun}, _From, State) -> - reply(ok, run_backing_queue(Mod, Fun, State)). - + end. handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -1107,24 +1034,16 @@ handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. noreply(deliver_or_enqueue(Delivery, State)); -handle_cast({ack, Txn, AckTags, ChPid}, +handle_cast({ack, AckTags, ChPid}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case lookup_ch(ChPid) of not_found -> noreply(State); C = #cr{acktags = ChAckTags} -> - {C1, State1} = - case Txn of - none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), - NewC = C#cr{acktags = ChAckTags1}, - {_Guids, BQS1} = BQ:ack(AckTags, BQS), - {NewC, State#q{backing_queue_state = BQS1}}; - _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS), - {C#cr{txn = Txn}, - State#q{backing_queue_state = BQS1}} - end, - maybe_store_ch_record(C1), - noreply(State1) + maybe_store_ch_record(C#cr{acktags = subtract_acks( + ChAckTags, AckTags)}), + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + noreply(State#q{backing_queue_state = BQS1}) end; handle_cast({reject, AckTags, Requeue, ChPid}, @@ -1143,12 +1062,6 @@ handle_cast({reject, AckTags, Requeue, ChPid}, end) end; -handle_cast({rollback, Txn, ChPid}, State) -> - noreply(case lookup_ch(ChPid) of - not_found -> State; - C -> rollback_transaction(Txn, C, State) - end); - handle_cast(delete_immediately, State) -> {stop, normal, State}; |