diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-06-11 22:17:46 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-06-11 22:17:46 +0100 |
commit | c54549506c2086b09bf9831455d410c86a4baf65 (patch) | |
tree | e41ff53d509c738dca24c55b7ad7c01c524c3e3d | |
parent | 0c0ba7aecefebfd31faee7856d1050bc01d03818 (diff) | |
parent | 1c8662ab33fa801960f5729507cb91e63aedaed4 (diff) | |
download | rabbitmq-server-c54549506c2086b09bf9831455d410c86a4baf65.tar.gz |
merging in bug20943
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 249 |
1 files changed, 120 insertions, 129 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f8cfddf1..cf0ef44f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -53,14 +53,15 @@ has_had_consumers, next_msg_id, message_buffer, - round_robin}). + active_consumers, + blocked_consumers}). -record(consumer, {tag, ack_required}). -record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}). %% These are held in our process dictionary --record(cr, {consumers, +-record(cr, {consumer_count, ch_pid, limiter_pid, monitor_ref, @@ -99,7 +100,8 @@ init(Q) -> has_had_consumers = false, next_msg_id = 1, message_buffer = queue:new(), - round_robin = queue:new()}, ?HIBERNATE_AFTER}. + active_consumers = queue:new(), + blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? @@ -129,7 +131,7 @@ ch_record(ChPid) -> case get(Key) of undefined -> MonitorRef = erlang:monitor(process, ChPid), - C = #cr{consumers = [], + C = #cr{consumer_count = 0, ch_pid = ChPid, monitor_ref = MonitorRef, unacked_messages = dict:new(), @@ -165,13 +167,14 @@ record_current_channel_tx(ChPid, Txn) -> deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, - round_robin = RoundRobin, + active_consumers = ActiveConsumers, + blocked_consumers = BlockedConsumers, next_msg_id = NextId}) -> ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), - case queue:out(RoundRobin) of + case queue:out(ActiveConsumers) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, - RoundRobinTail} -> + ActiveConsumersTail} -> C = #cr{limiter_pid = LimiterPid, unsent_message_count = Count, unacked_messages = UAM} = ch_record(ChPid), @@ -187,18 +190,32 @@ deliver_immediately(Message, Delivered, NewC = C#cr{unsent_message_count = Count + 1, unacked_messages = NewUAM}, store_ch_record(NewC), - NewConsumers = + {NewActiveConsumers, NewBlockedConsumers} = case ch_record_state_transition(C, NewC) of - ok -> queue:in(QEntry, RoundRobinTail); - block -> block_consumers(ChPid, RoundRobinTail) + ok -> {queue:in(QEntry, ActiveConsumersTail), + BlockedConsumers}; + block -> + {ActiveConsumers1, BlockedConsumers1} = + move_consumers(ChPid, + ActiveConsumersTail, + BlockedConsumers), + {ActiveConsumers1, + queue:in(QEntry, BlockedConsumers1)} end, - {offered, AckRequired, State#q{round_robin = NewConsumers, - next_msg_id = NextId + 1}}; + {offered, AckRequired, + State#q{active_consumers = NewActiveConsumers, + blocked_consumers = NewBlockedConsumers, + next_msg_id = NextId + 1}}; false -> store_ch_record(C#cr{is_limit_active = true}), - NewConsumers = block_consumers(ChPid, RoundRobinTail), - deliver_immediately(Message, Delivered, - State#q{round_robin = NewConsumers}) + {NewActiveConsumers, NewBlockedConsumers} = + move_consumers(ChPid, + ActiveConsumers, + BlockedConsumers), + deliver_immediately( + Message, Delivered, + State#q{active_consumers = NewActiveConsumers, + blocked_consumers = NewBlockedConsumers}) end; {empty, _} -> {not_offered, State} @@ -234,22 +251,24 @@ deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) -> run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)), State). -block_consumers(ChPid, RoundRobin) -> - %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ChPid, queue:to_list(RoundRobin)]), - queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end, - queue:to_list(RoundRobin))). - -unblock_consumers(ChPid, Consumers, RoundRobin) -> - %%?LOGDEBUG("Unblocking ~p ~p ~p~n", [ChPid, Consumers, queue:to_list(RoundRobin)]), - queue:join(RoundRobin, - queue:from_list([{ChPid, Con} || Con <- Consumers])). +add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). -block_consumer(ChPid, ConsumerTag, RoundRobin) -> - %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ConsumerTag, queue:to_list(RoundRobin)]), +remove_consumer(ChPid, ConsumerTag, Queue) -> + %% TODO: replace this with queue:filter/2 once we move to R12 queue:from_list(lists:filter( fun ({CP, #consumer{tag = CT}}) -> (CP /= ChPid) or (CT /= ConsumerTag) - end, queue:to_list(RoundRobin))). + end, queue:to_list(Queue))). + +remove_consumers(ChPid, Queue) -> + %% TODO: replace this with queue:filter/2 once we move to R12 + queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end, + queue:to_list(Queue))). + +move_consumers(ChPid, From, To) -> + {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end, + queue:to_list(From)), + {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}. possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of @@ -260,50 +279,25 @@ possibly_unblock(State, ChPid, Update) -> store_ch_record(NewC), case ch_record_state_transition(C, NewC) of ok -> State; - unblock -> NewRR = unblock_consumers(ChPid, - NewC#cr.consumers, - State#q.round_robin), - run_poke_burst(State#q{round_robin = NewRR}) + unblock -> {NewBlockedeConsumers, NewActiveConsumers} = + move_consumers(ChPid, + State#q.blocked_consumers, + State#q.active_consumers), + run_poke_burst( + State#q{active_consumers = NewActiveConsumers, + blocked_consumers = NewBlockedeConsumers}) end end. -check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) -> - {continue, State}; -check_auto_delete(State = #q{has_had_consumers = false}) -> - {continue, State}; -check_auto_delete(State = #q{round_robin = RoundRobin}) -> - % The clauses above rule out cases where no-one has consumed from - % this queue yet, and cases where we are not an auto_delete queue - % in any case. Thus it remains to check whether we have any active - % listeners at this point. - case queue:is_empty(RoundRobin) of - true -> - % There are no waiting listeners. It's possible that we're - % completely unused. Check. - case is_unused() of - true -> - % There are no active consumers at this - % point. This is the signal to autodelete. - {stop, State}; - false -> - % There is at least one active consumer, so we - % shouldn't delete ourselves. - {continue, State} - end; - false -> - % There are some waiting listeners, thus we are not - % unused, so can continue life as normal without needing - % to check the process dictionary. - {continue, State} - end. +should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; +should_auto_delete(#q{has_had_consumers = false}) -> false; +should_auto_delete(State) -> is_unused(State). -handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, - round_robin = ActiveConsumers}) -> +handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> case lookup_ch(DownPid) of not_found -> noreply(State); #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn, unacked_messages = UAM} -> - NewActive = block_consumers(ChPid, ActiveConsumers), erlang:demonitor(MonitorRef), erase({ch, ChPid}), case Txn of @@ -311,20 +305,22 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, _ -> ok = rollback_work(Txn, qname(State)), erase_tx(Txn) end, - case check_auto_delete( - deliver_or_enqueue_n( - [{Message, true} || - {_Messsage_id, Message} <- dict:to_list(UAM)], - State#q{ - exclusive_consumer = case Holder of - {ChPid, _} -> none; - Other -> Other - end, - round_robin = NewActive})) of - {continue, NewState} -> - noreply(NewState); - {stop, NewState} -> - {stop, normal, NewState} + NewState = + deliver_or_enqueue_n( + [{Message, true} || + {_Messsage_id, Message} <- dict:to_list(UAM)], + State#q{ + exclusive_consumer = case Holder of + {ChPid, _} -> none; + Other -> Other + end, + active_consumers = remove_consumers( + ChPid, State#q.active_consumers), + blocked_consumers = remove_consumers( + ChPid, State#q.blocked_consumers)}), + case should_auto_delete(NewState) of + false -> noreply(NewState); + true -> {stop, normal, NewState} end end. @@ -337,12 +333,12 @@ check_queue_owner(none, _) -> ok; check_queue_owner({ReaderPid, _}, ReaderPid) -> ok; check_queue_owner({_, _}, _) -> mismatch. -check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume) -> +check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) -> in_use; -check_exclusive_access(none, false) -> +check_exclusive_access(none, false, _State) -> ok; -check_exclusive_access(none, true) -> - case is_unused() of +check_exclusive_access(none, true, State) -> + case is_unused(State) of true -> ok; false -> in_use end. @@ -367,16 +363,8 @@ run_poke_burst(MessageBuffer, State) -> State#q{message_buffer = MessageBuffer} end. -is_unused() -> - is_unused1(get()). - -is_unused1([]) -> - true; -is_unused1([{{ch, _}, #cr{consumers = Consumers}} | _Rest]) - when Consumers /= [] -> - false; -is_unused1([_ | Rest]) -> - is_unused1(Rest). +is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso + queue:is_empty(State#q.blocked_consumers). maybe_send_reply(_ChPid, undefined) -> ok; maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). @@ -535,9 +523,8 @@ i(messages, State) -> i(acks_uncommitted, _) -> lists:sum([length(Pending) || #tx{pending_acks = Pending} <- all_tx_record()]); -i(consumers, _) -> - lists:sum([length(Consumers) || - #cr{consumers = Consumers} <- all_ch_record()]); +i(consumers, State) -> + queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers); i(transactions, _) -> length(all_tx_record()); i(memory, _) -> @@ -619,22 +606,22 @@ handle_call({basic_get, ChPid, NoAck}, _From, handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, _From, State = #q{owner = Owner, - exclusive_consumer = ExistingHolder, - round_robin = RoundRobin}) -> + exclusive_consumer = ExistingHolder}) -> case check_queue_owner(Owner, ReaderPid) of mismatch -> reply({error, queue_owned_by_another_connection}, State); ok -> - case check_exclusive_access(ExistingHolder, ExclusiveConsume) of + case check_exclusive_access(ExistingHolder, ExclusiveConsume, + State) of in_use -> reply({error, exclusive_consume_unavailable}, State); ok -> - C = #cr{consumers = Consumers} = ch_record(ChPid), + C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, - store_ch_record(C#cr{consumers = [Consumer | Consumers], + store_ch_record(C#cr{consumer_count = ConsumerCount +1, limiter_pid = LimiterPid}), - if Consumers == [] -> + if ConsumerCount == 0 -> ok = rabbit_limiter:register(LimiterPid, self()); true -> ok @@ -648,58 +635,62 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ok = maybe_send_reply(ChPid, OkMsg), State2 = case is_ch_blocked(C) of - true -> State1; + true -> State1#q{ + blocked_consumers = + add_consumer( + ChPid, Consumer, + State1#q.blocked_consumers)}; false -> run_poke_burst( State1#q{ - round_robin = queue:in( - {ChPid, Consumer}, - RoundRobin)}) + active_consumers = + add_consumer( + ChPid, Consumer, + State1#q.active_consumers)}) end, reply(ok, State2) end end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, - State = #q{exclusive_consumer = Holder, - round_robin = RoundRobin}) -> + State = #q{exclusive_consumer = Holder}) -> case lookup_ch(ChPid) of not_found -> ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C = #cr{consumers = Consumers, limiter_pid = LimiterPid} -> - NewConsumers = lists:filter - (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end, - Consumers), - store_ch_record(C#cr{consumers = NewConsumers}), - if NewConsumers == [] -> + C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} -> + store_ch_record(C#cr{consumer_count = ConsumerCount - 1}), + if ConsumerCount == 1 -> ok = rabbit_limiter:unregister(LimiterPid, self()); true -> ok end, ok = maybe_send_reply(ChPid, OkMsg), - case check_auto_delete( - State#q{exclusive_consumer = cancel_holder(ChPid, - ConsumerTag, - Holder), - round_robin = block_consumer(ChPid, - ConsumerTag, - RoundRobin)}) of - {continue, State1} -> - reply(ok, State1); - {stop, State1} -> - {stop, normal, ok, State1} + NewState = + State#q{exclusive_consumer = cancel_holder(ChPid, + ConsumerTag, + Holder), + active_consumers = remove_consumer( + ChPid, ConsumerTag, + State#q.active_consumers), + blocked_consumers = remove_consumer( + ChPid, ConsumerTag, + State#q.blocked_consumers)}, + case should_auto_delete(NewState) of + false -> reply(ok, NewState); + true -> {stop, normal, ok, NewState} end end; handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, message_buffer = MessageBuffer, - round_robin = RoundRobin}) -> - reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State); + active_consumers = ActiveConsumers}) -> + reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)}, + State); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{message_buffer = MessageBuffer}) -> IsEmpty = queue:is_empty(MessageBuffer), - IsUnused = is_unused(), + IsUnused = is_unused(State), if IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); @@ -718,7 +709,7 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, exclusive_consumer = Holder}) -> case Owner of none -> - case check_exclusive_access(Holder, true) of + case check_exclusive_access(Holder, true, State) of in_use -> %% FIXME: Is this really the right answer? What if %% an active consumer's reader is actually the @@ -794,10 +785,10 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> noreply( possibly_unblock( State, ChPid, - fun (C = #cr{consumers = Consumers, + fun (C = #cr{consumer_count = ConsumerCount, limiter_pid = OldLimiterPid, is_limit_active = Limited}) -> - if Consumers =/= [] andalso OldLimiterPid == undefined -> + if ConsumerCount =/= 0 andalso OldLimiterPid == undefined -> ok = rabbit_limiter:register(LimiterPid, self()); true -> ok |