summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-09-22 17:23:16 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-09-22 17:23:16 +0100
commit67fcf0a81b3336960de2e3d2fd488fc260565480 (patch)
treed82931638ce4f84223c2ebab31293352e802ee25
parentf203e2fa14812e82166d6cab72909135a26036fb (diff)
parent61b426d83ebb25f9a6bc07cb5e1a5d3833b6d23e (diff)
downloadrabbitmq-server-67fcf0a81b3336960de2e3d2fd488fc260565480.tar.gz
Merging bug24433 to default
-rw-r--r--src/rabbit_amqqueue_process.erl224
1 files changed, 105 insertions, 119 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c28cd5bf..e3a2ca90 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -42,7 +42,6 @@
backing_queue,
backing_queue_state,
active_consumers,
- blocked_consumers,
expires,
sync_timer_ref,
rate_timer_ref,
@@ -60,6 +59,7 @@
monitor_ref,
acktags,
consumer_count,
+ blocked_consumers,
limiter,
is_limit_active,
unsent_message_count}).
@@ -124,7 +124,6 @@ init(Q) ->
backing_queue = backing_queue_module(Q),
backing_queue_state = undefined,
active_consumers = queue:new(),
- blocked_consumers = queue:new(),
expires = undefined,
sync_timer_ref = undefined,
rate_timer_ref = undefined,
@@ -150,7 +149,6 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
backing_queue = BQ,
backing_queue_state = BQS,
active_consumers = queue:new(),
- blocked_consumers = queue:new(),
expires = undefined,
sync_timer_ref = undefined,
rate_timer_ref = RateTRef,
@@ -340,6 +338,7 @@ ch_record(ChPid) ->
monitor_ref = MonitorRef,
acktags = sets:new(),
consumer_count = 0,
+ blocked_consumers = queue:new(),
is_limit_active = false,
limiter = rabbit_limiter:make_token(),
unsent_message_count = 0},
@@ -381,6 +380,9 @@ update_consumer_count(C = #cr{consumer_count = Count}, Delta) ->
all_ch_record() -> [C || {{ch, _}, C} <- get()].
+block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
+ update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}).
+
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
@@ -392,67 +394,56 @@ ch_record_state_transition(OldCR, NewCR) ->
end.
deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
- State = #q{q = #amqqueue{name = QName},
- active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers}) ->
- case queue:out(ActiveConsumers) of
- {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
- ack_required = AckRequired}}},
- ActiveConsumersTail} ->
- C = #cr{limiter = Limiter,
- unsent_message_count = Count,
- acktags = ChAckTags} = ch_record(ChPid),
- IsMsgReady = PredFun(FunAcc, State),
- case (IsMsgReady andalso
- rabbit_limiter:can_send(Limiter, self(), AckRequired)) of
- true ->
- {{Message, IsDelivered, AckTag}, FunAcc1, State1} =
- DeliverFun(AckRequired, FunAcc, State),
- rabbit_channel:deliver(
- ChPid, ConsumerTag, AckRequired,
- {QName, self(), AckTag, IsDelivered, Message}),
- ChAckTags1 =
- case AckRequired of
- true -> sets:add_element(AckTag, ChAckTags);
- false -> ChAckTags
- end,
- NewC = update_ch_record(
- C#cr{unsent_message_count = Count + 1,
- acktags = ChAckTags1}),
- {NewActiveConsumers, NewBlockedConsumers} =
- case ch_record_state_transition(C, NewC) of
- ok -> {queue:in(QEntry, ActiveConsumersTail),
- BlockedConsumers};
- block -> {ActiveConsumers1, BlockedConsumers1} =
- move_consumers(ChPid,
- ActiveConsumersTail,
- BlockedConsumers),
- {ActiveConsumers1,
- queue:in(QEntry, BlockedConsumers1)}
- end,
- State2 = State1#q{
- active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers},
- deliver_msgs_to_consumers(Funs, FunAcc1, State2);
- %% if IsMsgReady then we've hit the limiter
- false when IsMsgReady ->
- update_ch_record(C#cr{is_limit_active = true}),
- {NewActiveConsumers, NewBlockedConsumers} =
- move_consumers(ChPid,
- ActiveConsumers,
- BlockedConsumers),
- deliver_msgs_to_consumers(
- Funs, FunAcc,
- State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers});
- false ->
- %% no message was ready, so we don't need to block anyone
- {FunAcc, State}
- end;
- {empty, _} ->
- {FunAcc, State}
+ State = #q{active_consumers = ActiveConsumers}) ->
+ case PredFun(FunAcc, State) of
+ false -> {FunAcc, State};
+ true -> case queue:out(ActiveConsumers) of
+ {empty, _} ->
+ {FunAcc, State};
+ {{value, QEntry}, Tail} ->
+ {FunAcc1, State1} =
+ deliver_msg_to_consumer(
+ DeliverFun, QEntry,
+ FunAcc, State#q{active_consumers = Tail}),
+ deliver_msgs_to_consumers(Funs, FunAcc1, State1)
+ end
+ end.
+
+deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, FunAcc, State) ->
+ C = ch_record(ChPid),
+ case is_ch_blocked(C) of
+ true -> block_consumer(C, E),
+ {FunAcc, State};
+ false -> case rabbit_limiter:can_send(C#cr.limiter, self(),
+ Consumer#consumer.ack_required) of
+ false -> block_consumer(C#cr{is_limit_active = true}, E),
+ {FunAcc, State};
+ true -> AC1 = queue:in(E, State#q.active_consumers),
+ deliver_msg_to_consumer(
+ DeliverFun, Consumer, C, FunAcc,
+ State#q{active_consumers = AC1})
+ end
end.
+deliver_msg_to_consumer(DeliverFun,
+ #consumer{tag = ConsumerTag,
+ ack_required = AckRequired},
+ C = #cr{ch_pid = ChPid,
+ acktags = ChAckTags,
+ unsent_message_count = Count},
+ FunAcc, State = #q{q = #amqqueue{name = QName}}) ->
+ {{Message, IsDelivered, AckTag}, FunAcc1, State1} =
+ DeliverFun(AckRequired, FunAcc, State),
+ rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
+ {QName, self(), AckTag, IsDelivered, Message}),
+ ChAckTags1 = case AckRequired of
+ true -> sets:add_element(AckTag, ChAckTags);
+ false -> ChAckTags
+ end,
+ update_ch_record(C#cr{acktags = ChAckTags1,
+ unsent_message_count = Count + 1}),
+ {FunAcc1, State1}.
+
deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty.
deliver_from_queue_deliver(AckRequired, false, State) ->
@@ -589,43 +580,34 @@ fetch(AckRequired, State = #q{backing_queue_state = BQS,
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
{Result, State#q{backing_queue_state = BQS1}}.
-add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
-
remove_consumer(ChPid, ConsumerTag, Queue) ->
queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
(CP /= ChPid) or (CTag /= ConsumerTag)
end, Queue).
remove_consumers(ChPid, Queue) ->
- {Kept, Removed} = split_by_channel(ChPid, Queue),
- [emit_consumer_deleted(Ch, CTag) ||
- {Ch, #consumer{tag = CTag}} <- queue:to_list(Removed)],
- Kept.
-
-move_consumers(ChPid, From, To) ->
- {Kept, Removed} = split_by_channel(ChPid, From),
- {Kept, queue:join(To, Removed)}.
-
-split_by_channel(ChPid, Queue) ->
- {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end,
- queue:to_list(Queue)),
- {queue:from_list(Kept), queue:from_list(Removed)}.
+ queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
+ emit_consumer_deleted(ChPid, CTag),
+ false;
+ (_) ->
+ true
+ end, Queue).
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
not_found ->
State;
C ->
- NewC = update_ch_record(Update(C)),
- case ch_record_state_transition(C, NewC) of
- ok -> State;
- unblock -> {NewBlockedConsumers, NewActiveConsumers} =
- move_consumers(ChPid,
- State#q.blocked_consumers,
- State#q.active_consumers),
- run_message_queue(
- State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers})
+ C1 = Update(C),
+ case ch_record_state_transition(C, C1) of
+ ok -> update_ch_record(C1),
+ State;
+ unblock -> #cr{blocked_consumers = Consumers} = C1,
+ update_ch_record(
+ C1#cr{blocked_consumers = queue:new()}),
+ AC1 = queue:join(State#q.active_consumers,
+ Consumers),
+ run_message_queue(State#q{active_consumers = AC1})
end
end.
@@ -637,7 +619,10 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
case lookup_ch(DownPid) of
not_found ->
{ok, State};
- C = #cr{ch_pid = ChPid, acktags = ChAckTags} ->
+ C = #cr{ch_pid = ChPid,
+ acktags = ChAckTags,
+ blocked_consumers = Blocked} ->
+ _ = remove_consumers(ChPid, Blocked), %% for stats emission
ok = erase_ch_record(C),
State1 = State#q{
exclusive_consumer = case Holder of
@@ -645,9 +630,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
Other -> Other
end,
active_consumers = remove_consumers(
- ChPid, State#q.active_consumers),
- blocked_consumers = remove_consumers(
- ChPid, State#q.blocked_consumers)},
+ ChPid, State#q.active_consumers)},
case should_auto_delete(State1) of
true -> {stop, State1};
false -> {ok, requeue_and_run(sets:to_list(ChAckTags),
@@ -665,8 +648,15 @@ check_exclusive_access(none, true, State) ->
false -> in_use
end.
-is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso
- queue:is_empty(State#q.blocked_consumers).
+consumer_count() -> consumer_count(fun (_) -> false end).
+
+active_consumer_count() -> consumer_count(fun is_ch_blocked/1).
+
+consumer_count(Exclude) ->
+ lists:sum([Count || C = #cr{consumer_count = Count} <- all_ch_record(),
+ not Exclude(C)]).
+
+is_unused(_State) -> consumer_count() == 0.
maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
@@ -792,8 +782,8 @@ i(messages_unacknowledged, _) ->
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
messages_unacknowledged]]);
-i(consumers, State) ->
- queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers);
+i(consumers, _) ->
+ consumer_count();
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
@@ -809,13 +799,15 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
i(Item, _) ->
throw({bad_argument, Item}).
-consumers(#q{active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers}) ->
+consumers(#q{active_consumers = ActiveConsumers}) ->
+ lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end,
+ consumers(ActiveConsumers, []), all_ch_record()).
+
+consumers(Consumers, Acc) ->
rabbit_misc:queue_fold(
- fun ({ChPid, #consumer{tag = ConsumerTag,
- ack_required = AckRequired}}, Acc) ->
- [{ChPid, ConsumerTag, AckRequired} | Acc]
- end, [], queue:join(ActiveConsumers, BlockedConsumers)).
+ fun ({ChPid, #consumer{tag = CTag, ack_required = AckRequired}}, Acc1) ->
+ [{ChPid, CTag, AckRequired} | Acc1]
+ end, Acc, Consumers).
emit_stats(State) ->
emit_stats(State, []).
@@ -982,17 +974,14 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
+ E = {ChPid, Consumer},
State2 =
case is_ch_blocked(C1) of
- true -> State1#q{
- blocked_consumers =
- add_consumer(ChPid, Consumer,
- State1#q.blocked_consumers)};
- false -> run_message_queue(
- State1#q{
- active_consumers =
- add_consumer(ChPid, Consumer,
- State1#q.active_consumers)})
+ true -> block_consumer(C1, E),
+ State1;
+ false -> update_ch_record(C1),
+ AC1 = queue:in(E, State1#q.active_consumers),
+ run_message_queue(State1#q{active_consumers = AC1})
end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck),
@@ -1005,9 +994,10 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
case lookup_ch(ChPid) of
not_found ->
reply(ok, State);
- C ->
- update_consumer_count(C, -1),
+ C = #cr{blocked_consumers = Blocked} ->
emit_consumer_deleted(ChPid, ConsumerTag),
+ Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
+ update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1),
State1 = State#q{
exclusive_consumer = case Holder of
{ChPid, ConsumerTag} -> none;
@@ -1015,10 +1005,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end,
active_consumers = remove_consumer(
ChPid, ConsumerTag,
- State#q.active_consumers),
- blocked_consumers = remove_consumer(
- ChPid, ConsumerTag,
- State#q.blocked_consumers)},
+ State#q.active_consumers)},
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> {stop, normal, ok, State1}
@@ -1026,10 +1013,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end;
handle_call(stat, _From, State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS,
- active_consumers = ActiveConsumers} =
+ State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
drop_expired_messages(ensure_expiry_timer(State)),
- reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State1);
+ reply({ok, BQ:len(BQS), active_consumer_count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->