diff options
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 125 |
1 files changed, 56 insertions, 69 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7719dfe7..b32fa0ff 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -46,7 +46,7 @@ rate_timer_ref, expiry_timer_ref, stats_timer, - guid_to_channel, + msg_id_to_channel, ttl, ttl_timer_ref }). @@ -112,7 +112,7 @@ init(Q) -> expiry_timer_ref = undefined, ttl = undefined, stats_timer = rabbit_event:init_stats_timer(), - guid_to_channel = dict:new()}, hibernate, + msg_id_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> @@ -283,17 +283,16 @@ lookup_ch(ChPid) -> ch_record(ChPid) -> Key = {ch, ChPid}, case get(Key) of - undefined -> - MonitorRef = erlang:monitor(process, ChPid), - C = #cr{consumer_count = 0, - ch_pid = ChPid, - monitor_ref = MonitorRef, - acktags = sets:new(), - is_limit_active = false, - txn = none, - unsent_message_count = 0}, - put(Key, C), - C; + undefined -> MonitorRef = erlang:monitor(process, ChPid), + C = #cr{consumer_count = 0, + ch_pid = ChPid, + monitor_ref = MonitorRef, + acktags = sets:new(), + is_limit_active = false, + txn = none, + unsent_message_count = 0}, + put(Key, C), + C; C = #cr{} -> C end. @@ -319,18 +318,16 @@ erase_ch_record(#cr{ch_pid = ChPid, erase({ch, ChPid}), ok. -all_ch_record() -> - [C || {{ch, _}, C} <- get()]. +all_ch_record() -> [C || {{ch, _}, C} <- get()]. is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. ch_record_state_transition(OldCR, NewCR) -> - BlockedOld = is_ch_blocked(OldCR), - BlockedNew = is_ch_blocked(NewCR), - if BlockedOld andalso not(BlockedNew) -> unblock; - BlockedNew andalso not(BlockedOld) -> block; - true -> ok + case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of + {true, false} -> unblock; + {false, true} -> block; + {_, _} -> ok end. deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, @@ -365,13 +362,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, case ch_record_state_transition(C, NewC) of ok -> {queue:in(QEntry, ActiveConsumersTail), BlockedConsumers}; - block -> - {ActiveConsumers1, BlockedConsumers1} = - move_consumers(ChPid, - ActiveConsumersTail, - BlockedConsumers), - {ActiveConsumers1, - queue:in(QEntry, BlockedConsumers1)} + block -> {ActiveConsumers1, BlockedConsumers1} = + move_consumers(ChPid, + ActiveConsumersTail, + BlockedConsumers), + {ActiveConsumers1, + queue:in(QEntry, BlockedConsumers1)} end, State2 = State1#q{ active_consumers = NewActiveConsumers, @@ -396,30 +392,28 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {FunAcc, State} end. -deliver_from_queue_pred(IsEmpty, _State) -> - not IsEmpty. +deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty. deliver_from_queue_deliver(AckRequired, false, State) -> {{Message, IsDelivered, AckTag, Remaining}, State1} = fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. -confirm_messages(Guids, State = #q{guid_to_channel = GTC}) -> - {CMs, GTC1} = - lists:foldl( - fun(Guid, {CMs, GTC0}) -> - case dict:find(Guid, GTC0) of - {ok, {ChPid, MsgSeqNo}} -> - {gb_trees_cons(ChPid, MsgSeqNo, CMs), - dict:erase(Guid, GTC0)}; - _ -> - {CMs, GTC0} - end - end, {gb_trees:empty(), GTC}, Guids), +confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> + {CMs, MTC1} = lists:foldl( + fun(MsgId, {CMs, MTC0}) -> + case dict:find(MsgId, MTC0) of + {ok, {ChPid, MsgSeqNo}} -> + {gb_trees_cons(ChPid, MsgSeqNo, CMs), + dict:erase(MsgId, MTC0)}; + _ -> + {CMs, MTC0} + end + end, {gb_trees:empty(), MTC}, MsgIds), gb_trees:map(fun(ChPid, MsgSeqNos) -> rabbit_channel:confirm(ChPid, MsgSeqNos) end, CMs), - State#q{guid_to_channel = GTC1}. + State#q{msg_id_to_channel = MTC1}. gb_trees_cons(Key, Value, Tree) -> case gb_trees:lookup(Key, Tree) of @@ -433,12 +427,12 @@ record_confirm_message(#delivery{sender = ChPid, msg_seq_no = MsgSeqNo, message = #basic_message { is_persistent = true, - guid = Guid}}, + id = MsgId}}, State = - #q{guid_to_channel = GTC, - q = #amqqueue{durable = true}}) -> + #q{msg_id_to_channel = MTC, + q = #amqqueue{durable = true}}) -> {confirm, - State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}}; + State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)}}; record_confirm_message(_Delivery, State) -> {no_confirm, State}. @@ -480,17 +474,14 @@ attempt_delivery(#delivery{txn = none, {Delivered, State1} = deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State), {Delivered, NeedsConfirming, State1}; -attempt_delivery(#delivery{txn = Txn, +attempt_delivery(#delivery{txn = Txn, sender = ChPid, message = Message}, - {NeedsConfirming, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}}) -> + {NeedsConfirming, State = #q{backing_queue = BQ, + backing_queue_state = BQS}}) -> store_ch_record((ch_record(ChPid))#cr{txn = Txn}), - {true, - NeedsConfirming, - State#q{backing_queue_state = - BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}. + BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS), + {true, NeedsConfirming, State#q{backing_queue_state = BQS1}}. deliver_or_enqueue(Delivery, State) -> case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of @@ -618,9 +609,9 @@ backing_queue_idle_timeout(State = #q{backing_queue = BQ}) -> fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State). maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - {Guids, BQS1} = Fun(BQS), + {MsgIds, BQS1} = Fun(BQS), run_message_queue( - confirm_messages(Guids, State#q{backing_queue_state = BQS1})). + confirm_messages(MsgIds, State#q{backing_queue_state = BQS1})). commit_transaction(Txn, From, C = #cr{acktags = ChAckTags}, State = #q{backing_queue = BQ, @@ -661,9 +652,8 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> Now = now_micros(), BQS1 = BQ:dropwhile( - fun (#message_properties{expiry = Expiry}) -> - Now > Expiry - end, BQS), + fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, + BQS), ensure_ttl_timer(State#q{backing_queue_state = BQS1}). ensure_ttl_timer(State = #q{backing_queue = BQ, @@ -767,8 +757,8 @@ prioritise_cast(Msg, _State) -> maybe_expire -> 8; drop_expired -> 8; emit_stats -> 7; - {ack, _Txn, _MsgIds, _ChPid} -> 7; - {reject, _MsgIds, _Requeue, _ChPid} -> 7; + {ack, _Txn, _AckTags, _ChPid} -> 7; + {reject, _AckTags, _Requeue, _ChPid} -> 7; {notify_sent, _ChPid} -> 7; {unblock, _ChPid} -> 7; {maybe_run_queue_via_backing_queue, _Fun} -> 6; @@ -814,8 +804,7 @@ handle_call({info, Items}, _From, State) -> handle_call(consumers, _From, State) -> reply(consumers(State), State); -handle_call({deliver_immediately, Delivery}, - _From, State) -> +handle_call({deliver_immediately, Delivery}, _From, State) -> %% Synchronous, "immediate" delivery mode %% %% FIXME: Is this correct semantics? @@ -906,15 +895,13 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, case is_ch_blocked(C) of true -> State1#q{ blocked_consumers = - add_consumer( - ChPid, Consumer, - State1#q.blocked_consumers)}; + add_consumer(ChPid, Consumer, + State1#q.blocked_consumers)}; false -> run_message_queue( State1#q{ active_consumers = - add_consumer( - ChPid, Consumer, - State1#q.active_consumers)}) + add_consumer(ChPid, Consumer, + State1#q.active_consumers)}) end, emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck), |