summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-11 22:17:46 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-11 22:17:46 +0100
commitc54549506c2086b09bf9831455d410c86a4baf65 (patch)
treee41ff53d509c738dca24c55b7ad7c01c524c3e3d
parent0c0ba7aecefebfd31faee7856d1050bc01d03818 (diff)
parent1c8662ab33fa801960f5729507cb91e63aedaed4 (diff)
downloadrabbitmq-server-c54549506c2086b09bf9831455d410c86a4baf65.tar.gz
merging in bug20943
-rw-r--r--src/rabbit_amqqueue_process.erl249
1 files changed, 120 insertions, 129 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f8cfddf1..cf0ef44f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -53,14 +53,15 @@
has_had_consumers,
next_msg_id,
message_buffer,
- round_robin}).
+ active_consumers,
+ blocked_consumers}).
-record(consumer, {tag, ack_required}).
-record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}).
%% These are held in our process dictionary
--record(cr, {consumers,
+-record(cr, {consumer_count,
ch_pid,
limiter_pid,
monitor_ref,
@@ -99,7 +100,8 @@ init(Q) ->
has_had_consumers = false,
next_msg_id = 1,
message_buffer = queue:new(),
- round_robin = queue:new()}, ?HIBERNATE_AFTER}.
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -129,7 +131,7 @@ ch_record(ChPid) ->
case get(Key) of
undefined ->
MonitorRef = erlang:monitor(process, ChPid),
- C = #cr{consumers = [],
+ C = #cr{consumer_count = 0,
ch_pid = ChPid,
monitor_ref = MonitorRef,
unacked_messages = dict:new(),
@@ -165,13 +167,14 @@ record_current_channel_tx(ChPid, Txn) ->
deliver_immediately(Message, Delivered,
State = #q{q = #amqqueue{name = QName},
- round_robin = RoundRobin,
+ active_consumers = ActiveConsumers,
+ blocked_consumers = BlockedConsumers,
next_msg_id = NextId}) ->
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
- case queue:out(RoundRobin) of
+ case queue:out(ActiveConsumers) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
- RoundRobinTail} ->
+ ActiveConsumersTail} ->
C = #cr{limiter_pid = LimiterPid,
unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
@@ -187,18 +190,32 @@ deliver_immediately(Message, Delivered,
NewC = C#cr{unsent_message_count = Count + 1,
unacked_messages = NewUAM},
store_ch_record(NewC),
- NewConsumers =
+ {NewActiveConsumers, NewBlockedConsumers} =
case ch_record_state_transition(C, NewC) of
- ok -> queue:in(QEntry, RoundRobinTail);
- block -> block_consumers(ChPid, RoundRobinTail)
+ ok -> {queue:in(QEntry, ActiveConsumersTail),
+ BlockedConsumers};
+ block ->
+ {ActiveConsumers1, BlockedConsumers1} =
+ move_consumers(ChPid,
+ ActiveConsumersTail,
+ BlockedConsumers),
+ {ActiveConsumers1,
+ queue:in(QEntry, BlockedConsumers1)}
end,
- {offered, AckRequired, State#q{round_robin = NewConsumers,
- next_msg_id = NextId + 1}};
+ {offered, AckRequired,
+ State#q{active_consumers = NewActiveConsumers,
+ blocked_consumers = NewBlockedConsumers,
+ next_msg_id = NextId + 1}};
false ->
store_ch_record(C#cr{is_limit_active = true}),
- NewConsumers = block_consumers(ChPid, RoundRobinTail),
- deliver_immediately(Message, Delivered,
- State#q{round_robin = NewConsumers})
+ {NewActiveConsumers, NewBlockedConsumers} =
+ move_consumers(ChPid,
+ ActiveConsumers,
+ BlockedConsumers),
+ deliver_immediately(
+ Message, Delivered,
+ State#q{active_consumers = NewActiveConsumers,
+ blocked_consumers = NewBlockedConsumers})
end;
{empty, _} ->
{not_offered, State}
@@ -234,22 +251,24 @@ deliver_or_enqueue_n(Messages, State = #q{message_buffer = MessageBuffer}) ->
run_poke_burst(queue:join(MessageBuffer, queue:from_list(Messages)),
State).
-block_consumers(ChPid, RoundRobin) ->
- %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ChPid, queue:to_list(RoundRobin)]),
- queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end,
- queue:to_list(RoundRobin))).
-
-unblock_consumers(ChPid, Consumers, RoundRobin) ->
- %%?LOGDEBUG("Unblocking ~p ~p ~p~n", [ChPid, Consumers, queue:to_list(RoundRobin)]),
- queue:join(RoundRobin,
- queue:from_list([{ChPid, Con} || Con <- Consumers])).
+add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
-block_consumer(ChPid, ConsumerTag, RoundRobin) ->
- %%?LOGDEBUG("~p Blocking ~p from ~p~n", [self(), ConsumerTag, queue:to_list(RoundRobin)]),
+remove_consumer(ChPid, ConsumerTag, Queue) ->
+ %% TODO: replace this with queue:filter/2 once we move to R12
queue:from_list(lists:filter(
fun ({CP, #consumer{tag = CT}}) ->
(CP /= ChPid) or (CT /= ConsumerTag)
- end, queue:to_list(RoundRobin))).
+ end, queue:to_list(Queue))).
+
+remove_consumers(ChPid, Queue) ->
+ %% TODO: replace this with queue:filter/2 once we move to R12
+ queue:from_list(lists:filter(fun ({CP, _}) -> CP /= ChPid end,
+ queue:to_list(Queue))).
+
+move_consumers(ChPid, From, To) ->
+ {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end,
+ queue:to_list(From)),
+ {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}.
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
@@ -260,50 +279,25 @@ possibly_unblock(State, ChPid, Update) ->
store_ch_record(NewC),
case ch_record_state_transition(C, NewC) of
ok -> State;
- unblock -> NewRR = unblock_consumers(ChPid,
- NewC#cr.consumers,
- State#q.round_robin),
- run_poke_burst(State#q{round_robin = NewRR})
+ unblock -> {NewBlockedeConsumers, NewActiveConsumers} =
+ move_consumers(ChPid,
+ State#q.blocked_consumers,
+ State#q.active_consumers),
+ run_poke_burst(
+ State#q{active_consumers = NewActiveConsumers,
+ blocked_consumers = NewBlockedeConsumers})
end
end.
-check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) ->
- {continue, State};
-check_auto_delete(State = #q{has_had_consumers = false}) ->
- {continue, State};
-check_auto_delete(State = #q{round_robin = RoundRobin}) ->
- % The clauses above rule out cases where no-one has consumed from
- % this queue yet, and cases where we are not an auto_delete queue
- % in any case. Thus it remains to check whether we have any active
- % listeners at this point.
- case queue:is_empty(RoundRobin) of
- true ->
- % There are no waiting listeners. It's possible that we're
- % completely unused. Check.
- case is_unused() of
- true ->
- % There are no active consumers at this
- % point. This is the signal to autodelete.
- {stop, State};
- false ->
- % There is at least one active consumer, so we
- % shouldn't delete ourselves.
- {continue, State}
- end;
- false ->
- % There are some waiting listeners, thus we are not
- % unused, so can continue life as normal without needing
- % to check the process dictionary.
- {continue, State}
- end.
+should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
+should_auto_delete(#q{has_had_consumers = false}) -> false;
+should_auto_delete(State) -> is_unused(State).
-handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
- round_robin = ActiveConsumers}) ->
+handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
case lookup_ch(DownPid) of
not_found -> noreply(State);
#cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn,
unacked_messages = UAM} ->
- NewActive = block_consumers(ChPid, ActiveConsumers),
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
case Txn of
@@ -311,20 +305,22 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
_ -> ok = rollback_work(Txn, qname(State)),
erase_tx(Txn)
end,
- case check_auto_delete(
- deliver_or_enqueue_n(
- [{Message, true} ||
- {_Messsage_id, Message} <- dict:to_list(UAM)],
- State#q{
- exclusive_consumer = case Holder of
- {ChPid, _} -> none;
- Other -> Other
- end,
- round_robin = NewActive})) of
- {continue, NewState} ->
- noreply(NewState);
- {stop, NewState} ->
- {stop, normal, NewState}
+ NewState =
+ deliver_or_enqueue_n(
+ [{Message, true} ||
+ {_Messsage_id, Message} <- dict:to_list(UAM)],
+ State#q{
+ exclusive_consumer = case Holder of
+ {ChPid, _} -> none;
+ Other -> Other
+ end,
+ active_consumers = remove_consumers(
+ ChPid, State#q.active_consumers),
+ blocked_consumers = remove_consumers(
+ ChPid, State#q.blocked_consumers)}),
+ case should_auto_delete(NewState) of
+ false -> noreply(NewState);
+ true -> {stop, normal, NewState}
end
end.
@@ -337,12 +333,12 @@ check_queue_owner(none, _) -> ok;
check_queue_owner({ReaderPid, _}, ReaderPid) -> ok;
check_queue_owner({_, _}, _) -> mismatch.
-check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume) ->
+check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
-check_exclusive_access(none, false) ->
+check_exclusive_access(none, false, _State) ->
ok;
-check_exclusive_access(none, true) ->
- case is_unused() of
+check_exclusive_access(none, true, State) ->
+ case is_unused(State) of
true -> ok;
false -> in_use
end.
@@ -367,16 +363,8 @@ run_poke_burst(MessageBuffer, State) ->
State#q{message_buffer = MessageBuffer}
end.
-is_unused() ->
- is_unused1(get()).
-
-is_unused1([]) ->
- true;
-is_unused1([{{ch, _}, #cr{consumers = Consumers}} | _Rest])
- when Consumers /= [] ->
- false;
-is_unused1([_ | Rest]) ->
- is_unused1(Rest).
+is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso
+ queue:is_empty(State#q.blocked_consumers).
maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
@@ -535,9 +523,8 @@ i(messages, State) ->
i(acks_uncommitted, _) ->
lists:sum([length(Pending) ||
#tx{pending_acks = Pending} <- all_tx_record()]);
-i(consumers, _) ->
- lists:sum([length(Consumers) ||
- #cr{consumers = Consumers} <- all_ch_record()]);
+i(consumers, State) ->
+ queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers);
i(transactions, _) ->
length(all_tx_record());
i(memory, _) ->
@@ -619,22 +606,22 @@ handle_call({basic_get, ChPid, NoAck}, _From,
handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg},
_From, State = #q{owner = Owner,
- exclusive_consumer = ExistingHolder,
- round_robin = RoundRobin}) ->
+ exclusive_consumer = ExistingHolder}) ->
case check_queue_owner(Owner, ReaderPid) of
mismatch ->
reply({error, queue_owned_by_another_connection}, State);
ok ->
- case check_exclusive_access(ExistingHolder, ExclusiveConsume) of
+ case check_exclusive_access(ExistingHolder, ExclusiveConsume,
+ State) of
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
- C = #cr{consumers = Consumers} = ch_record(ChPid),
+ C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not(NoAck)},
- store_ch_record(C#cr{consumers = [Consumer | Consumers],
+ store_ch_record(C#cr{consumer_count = ConsumerCount +1,
limiter_pid = LimiterPid}),
- if Consumers == [] ->
+ if ConsumerCount == 0 ->
ok = rabbit_limiter:register(LimiterPid, self());
true ->
ok
@@ -648,58 +635,62 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
ok = maybe_send_reply(ChPid, OkMsg),
State2 =
case is_ch_blocked(C) of
- true -> State1;
+ true -> State1#q{
+ blocked_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.blocked_consumers)};
false -> run_poke_burst(
State1#q{
- round_robin = queue:in(
- {ChPid, Consumer},
- RoundRobin)})
+ active_consumers =
+ add_consumer(
+ ChPid, Consumer,
+ State1#q.active_consumers)})
end,
reply(ok, State2)
end
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
- State = #q{exclusive_consumer = Holder,
- round_robin = RoundRobin}) ->
+ State = #q{exclusive_consumer = Holder}) ->
case lookup_ch(ChPid) of
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C = #cr{consumers = Consumers, limiter_pid = LimiterPid} ->
- NewConsumers = lists:filter
- (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end,
- Consumers),
- store_ch_record(C#cr{consumers = NewConsumers}),
- if NewConsumers == [] ->
+ C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} ->
+ store_ch_record(C#cr{consumer_count = ConsumerCount - 1}),
+ if ConsumerCount == 1 ->
ok = rabbit_limiter:unregister(LimiterPid, self());
true ->
ok
end,
ok = maybe_send_reply(ChPid, OkMsg),
- case check_auto_delete(
- State#q{exclusive_consumer = cancel_holder(ChPid,
- ConsumerTag,
- Holder),
- round_robin = block_consumer(ChPid,
- ConsumerTag,
- RoundRobin)}) of
- {continue, State1} ->
- reply(ok, State1);
- {stop, State1} ->
- {stop, normal, ok, State1}
+ NewState =
+ State#q{exclusive_consumer = cancel_holder(ChPid,
+ ConsumerTag,
+ Holder),
+ active_consumers = remove_consumer(
+ ChPid, ConsumerTag,
+ State#q.active_consumers),
+ blocked_consumers = remove_consumer(
+ ChPid, ConsumerTag,
+ State#q.blocked_consumers)},
+ case should_auto_delete(NewState) of
+ false -> reply(ok, NewState);
+ true -> {stop, normal, ok, NewState}
end
end;
handle_call(stat, _From, State = #q{q = #amqqueue{name = Name},
message_buffer = MessageBuffer,
- round_robin = RoundRobin}) ->
- reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State);
+ active_consumers = ActiveConsumers}) ->
+ reply({ok, Name, queue:len(MessageBuffer), queue:len(ActiveConsumers)},
+ State);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{message_buffer = MessageBuffer}) ->
IsEmpty = queue:is_empty(MessageBuffer),
- IsUnused = is_unused(),
+ IsUnused = is_unused(State),
if
IfEmpty and not(IsEmpty) ->
reply({error, not_empty}, State);
@@ -718,7 +709,7 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
exclusive_consumer = Holder}) ->
case Owner of
none ->
- case check_exclusive_access(Holder, true) of
+ case check_exclusive_access(Holder, true, State) of
in_use ->
%% FIXME: Is this really the right answer? What if
%% an active consumer's reader is actually the
@@ -794,10 +785,10 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
noreply(
possibly_unblock(
State, ChPid,
- fun (C = #cr{consumers = Consumers,
+ fun (C = #cr{consumer_count = ConsumerCount,
limiter_pid = OldLimiterPid,
is_limit_active = Limited}) ->
- if Consumers =/= [] andalso OldLimiterPid == undefined ->
+ if ConsumerCount =/= 0 andalso OldLimiterPid == undefined ->
ok = rabbit_limiter:register(LimiterPid, self());
true ->
ok