diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-09-22 17:23:16 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-09-22 17:23:16 +0100 |
commit | 67fcf0a81b3336960de2e3d2fd488fc260565480 (patch) | |
tree | d82931638ce4f84223c2ebab31293352e802ee25 | |
parent | f203e2fa14812e82166d6cab72909135a26036fb (diff) | |
parent | 61b426d83ebb25f9a6bc07cb5e1a5d3833b6d23e (diff) | |
download | rabbitmq-server-67fcf0a81b3336960de2e3d2fd488fc260565480.tar.gz |
Merging bug24433 to default
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 224 |
1 files changed, 105 insertions, 119 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c28cd5bf..e3a2ca90 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -42,7 +42,6 @@ backing_queue, backing_queue_state, active_consumers, - blocked_consumers, expires, sync_timer_ref, rate_timer_ref, @@ -60,6 +59,7 @@ monitor_ref, acktags, consumer_count, + blocked_consumers, limiter, is_limit_active, unsent_message_count}). @@ -124,7 +124,6 @@ init(Q) -> backing_queue = backing_queue_module(Q), backing_queue_state = undefined, active_consumers = queue:new(), - blocked_consumers = queue:new(), expires = undefined, sync_timer_ref = undefined, rate_timer_ref = undefined, @@ -150,7 +149,6 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, backing_queue = BQ, backing_queue_state = BQS, active_consumers = queue:new(), - blocked_consumers = queue:new(), expires = undefined, sync_timer_ref = undefined, rate_timer_ref = RateTRef, @@ -340,6 +338,7 @@ ch_record(ChPid) -> monitor_ref = MonitorRef, acktags = sets:new(), consumer_count = 0, + blocked_consumers = queue:new(), is_limit_active = false, limiter = rabbit_limiter:make_token(), unsent_message_count = 0}, @@ -381,6 +380,9 @@ update_consumer_count(C = #cr{consumer_count = Count}, Delta) -> all_ch_record() -> [C || {{ch, _}, C} <- get()]. +block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> + update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}). + is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. @@ -392,67 +394,56 @@ ch_record_state_transition(OldCR, NewCR) -> end. deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, - State = #q{q = #amqqueue{name = QName}, - active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers}) -> - case queue:out(ActiveConsumers) of - {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, - ack_required = AckRequired}}}, - ActiveConsumersTail} -> - C = #cr{limiter = Limiter, - unsent_message_count = Count, - acktags = ChAckTags} = ch_record(ChPid), - IsMsgReady = PredFun(FunAcc, State), - case (IsMsgReady andalso - rabbit_limiter:can_send(Limiter, self(), AckRequired)) of - true -> - {{Message, IsDelivered, AckTag}, FunAcc1, State1} = - DeliverFun(AckRequired, FunAcc, State), - rabbit_channel:deliver( - ChPid, ConsumerTag, AckRequired, - {QName, self(), AckTag, IsDelivered, Message}), - ChAckTags1 = - case AckRequired of - true -> sets:add_element(AckTag, ChAckTags); - false -> ChAckTags - end, - NewC = update_ch_record( - C#cr{unsent_message_count = Count + 1, - acktags = ChAckTags1}), - {NewActiveConsumers, NewBlockedConsumers} = - case ch_record_state_transition(C, NewC) of - ok -> {queue:in(QEntry, ActiveConsumersTail), - BlockedConsumers}; - block -> {ActiveConsumers1, BlockedConsumers1} = - move_consumers(ChPid, - ActiveConsumersTail, - BlockedConsumers), - {ActiveConsumers1, - queue:in(QEntry, BlockedConsumers1)} - end, - State2 = State1#q{ - active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers}, - deliver_msgs_to_consumers(Funs, FunAcc1, State2); - %% if IsMsgReady then we've hit the limiter - false when IsMsgReady -> - update_ch_record(C#cr{is_limit_active = true}), - {NewActiveConsumers, NewBlockedConsumers} = - move_consumers(ChPid, - ActiveConsumers, - BlockedConsumers), - deliver_msgs_to_consumers( - Funs, FunAcc, - State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers}); - false -> - %% no message was ready, so we don't need to block anyone - {FunAcc, State} - end; - {empty, _} -> - {FunAcc, State} + State = #q{active_consumers = ActiveConsumers}) -> + case PredFun(FunAcc, State) of + false -> {FunAcc, State}; + true -> case queue:out(ActiveConsumers) of + {empty, _} -> + {FunAcc, State}; + {{value, QEntry}, Tail} -> + {FunAcc1, State1} = + deliver_msg_to_consumer( + DeliverFun, QEntry, + FunAcc, State#q{active_consumers = Tail}), + deliver_msgs_to_consumers(Funs, FunAcc1, State1) + end + end. + +deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, FunAcc, State) -> + C = ch_record(ChPid), + case is_ch_blocked(C) of + true -> block_consumer(C, E), + {FunAcc, State}; + false -> case rabbit_limiter:can_send(C#cr.limiter, self(), + Consumer#consumer.ack_required) of + false -> block_consumer(C#cr{is_limit_active = true}, E), + {FunAcc, State}; + true -> AC1 = queue:in(E, State#q.active_consumers), + deliver_msg_to_consumer( + DeliverFun, Consumer, C, FunAcc, + State#q{active_consumers = AC1}) + end end. +deliver_msg_to_consumer(DeliverFun, + #consumer{tag = ConsumerTag, + ack_required = AckRequired}, + C = #cr{ch_pid = ChPid, + acktags = ChAckTags, + unsent_message_count = Count}, + FunAcc, State = #q{q = #amqqueue{name = QName}}) -> + {{Message, IsDelivered, AckTag}, FunAcc1, State1} = + DeliverFun(AckRequired, FunAcc, State), + rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, + {QName, self(), AckTag, IsDelivered, Message}), + ChAckTags1 = case AckRequired of + true -> sets:add_element(AckTag, ChAckTags); + false -> ChAckTags + end, + update_ch_record(C#cr{acktags = ChAckTags1, + unsent_message_count = Count + 1}), + {FunAcc1, State1}. + deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty. deliver_from_queue_deliver(AckRequired, false, State) -> @@ -589,43 +580,34 @@ fetch(AckRequired, State = #q{backing_queue_state = BQS, {Result, BQS1} = BQ:fetch(AckRequired, BQS), {Result, State#q{backing_queue_state = BQS1}}. -add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). - remove_consumer(ChPid, ConsumerTag, Queue) -> queue:filter(fun ({CP, #consumer{tag = CTag}}) -> (CP /= ChPid) or (CTag /= ConsumerTag) end, Queue). remove_consumers(ChPid, Queue) -> - {Kept, Removed} = split_by_channel(ChPid, Queue), - [emit_consumer_deleted(Ch, CTag) || - {Ch, #consumer{tag = CTag}} <- queue:to_list(Removed)], - Kept. - -move_consumers(ChPid, From, To) -> - {Kept, Removed} = split_by_channel(ChPid, From), - {Kept, queue:join(To, Removed)}. - -split_by_channel(ChPid, Queue) -> - {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end, - queue:to_list(Queue)), - {queue:from_list(Kept), queue:from_list(Removed)}. + queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid -> + emit_consumer_deleted(ChPid, CTag), + false; + (_) -> + true + end, Queue). possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of not_found -> State; C -> - NewC = update_ch_record(Update(C)), - case ch_record_state_transition(C, NewC) of - ok -> State; - unblock -> {NewBlockedConsumers, NewActiveConsumers} = - move_consumers(ChPid, - State#q.blocked_consumers, - State#q.active_consumers), - run_message_queue( - State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers}) + C1 = Update(C), + case ch_record_state_transition(C, C1) of + ok -> update_ch_record(C1), + State; + unblock -> #cr{blocked_consumers = Consumers} = C1, + update_ch_record( + C1#cr{blocked_consumers = queue:new()}), + AC1 = queue:join(State#q.active_consumers, + Consumers), + run_message_queue(State#q{active_consumers = AC1}) end end. @@ -637,7 +619,10 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> case lookup_ch(DownPid) of not_found -> {ok, State}; - C = #cr{ch_pid = ChPid, acktags = ChAckTags} -> + C = #cr{ch_pid = ChPid, + acktags = ChAckTags, + blocked_consumers = Blocked} -> + _ = remove_consumers(ChPid, Blocked), %% for stats emission ok = erase_ch_record(C), State1 = State#q{ exclusive_consumer = case Holder of @@ -645,9 +630,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> Other -> Other end, active_consumers = remove_consumers( - ChPid, State#q.active_consumers), - blocked_consumers = remove_consumers( - ChPid, State#q.blocked_consumers)}, + ChPid, State#q.active_consumers)}, case should_auto_delete(State1) of true -> {stop, State1}; false -> {ok, requeue_and_run(sets:to_list(ChAckTags), @@ -665,8 +648,15 @@ check_exclusive_access(none, true, State) -> false -> in_use end. -is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso - queue:is_empty(State#q.blocked_consumers). +consumer_count() -> consumer_count(fun (_) -> false end). + +active_consumer_count() -> consumer_count(fun is_ch_blocked/1). + +consumer_count(Exclude) -> + lists:sum([Count || C = #cr{consumer_count = Count} <- all_ch_record(), + not Exclude(C)]). + +is_unused(_State) -> consumer_count() == 0. maybe_send_reply(_ChPid, undefined) -> ok; maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). @@ -792,8 +782,8 @@ i(messages_unacknowledged, _) -> i(messages, State) -> lists:sum([i(Item, State) || Item <- [messages_ready, messages_unacknowledged]]); -i(consumers, State) -> - queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers); +i(consumers, _) -> + consumer_count(); i(memory, _) -> {memory, M} = process_info(self(), memory), M; @@ -809,13 +799,15 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> i(Item, _) -> throw({bad_argument, Item}). -consumers(#q{active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers}) -> +consumers(#q{active_consumers = ActiveConsumers}) -> + lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end, + consumers(ActiveConsumers, []), all_ch_record()). + +consumers(Consumers, Acc) -> rabbit_misc:queue_fold( - fun ({ChPid, #consumer{tag = ConsumerTag, - ack_required = AckRequired}}, Acc) -> - [{ChPid, ConsumerTag, AckRequired} | Acc] - end, [], queue:join(ActiveConsumers, BlockedConsumers)). + fun ({ChPid, #consumer{tag = CTag, ack_required = AckRequired}}, Acc1) -> + [{ChPid, CTag, AckRequired} | Acc1] + end, Acc, Consumers). emit_stats(State) -> emit_stats(State, []). @@ -982,17 +974,14 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, State1 = State#q{has_had_consumers = true, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), + E = {ChPid, Consumer}, State2 = case is_ch_blocked(C1) of - true -> State1#q{ - blocked_consumers = - add_consumer(ChPid, Consumer, - State1#q.blocked_consumers)}; - false -> run_message_queue( - State1#q{ - active_consumers = - add_consumer(ChPid, Consumer, - State1#q.active_consumers)}) + true -> block_consumer(C1, E), + State1; + false -> update_ch_record(C1), + AC1 = queue:in(E, State1#q.active_consumers), + run_message_queue(State1#q{active_consumers = AC1}) end, emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck), @@ -1005,9 +994,10 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, case lookup_ch(ChPid) of not_found -> reply(ok, State); - C -> - update_consumer_count(C, -1), + C = #cr{blocked_consumers = Blocked} -> emit_consumer_deleted(ChPid, ConsumerTag), + Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), + update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1), State1 = State#q{ exclusive_consumer = case Holder of {ChPid, ConsumerTag} -> none; @@ -1015,10 +1005,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, end, active_consumers = remove_consumer( ChPid, ConsumerTag, - State#q.active_consumers), - blocked_consumers = remove_consumer( - ChPid, ConsumerTag, - State#q.blocked_consumers)}, + State#q.active_consumers)}, case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); true -> {stop, normal, ok, State1} @@ -1026,10 +1013,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, end; handle_call(stat, _From, State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS, - active_consumers = ActiveConsumers} = + State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(ensure_expiry_timer(State)), - reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State1); + reply({ok, BQ:len(BQS), active_consumer_count()}, State1); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> |