summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-01-13 16:00:43 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-01-13 16:00:43 +0000
commita8a3f2ff7aa096517e5b5fff8b90cc35b6ff9db4 (patch)
tree7887cd7ee98d194daf22544512ef7a265125d3cb
parent38caabb071297c1745d96971cb41e92bfd90406b (diff)
downloadrabbitmq-server-a8a3f2ff7aa096517e5b5fff8b90cc35b6ff9db4.tar.gz
Move rabbit_flow:send invocation from router into channel; monitor all queues in the channel.
-rw-r--r--src/rabbit_channel.erl130
-rw-r--r--src/rabbit_router.erl1
2 files changed, 60 insertions, 71 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d87dc5d1..db9ff7a0 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -188,7 +188,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- queue_monitors = dict:new(),
+ queue_monitors = sets:new(),
consumer_mapping = dict:new(),
blocking = sets:new(),
queue_consumers = dict:new(),
@@ -303,13 +303,13 @@ handle_cast({deliver, ConsumerTag, AckRequired,
exchange = ExchangeName#resource.name,
routing_key = RoutingKey},
rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
- State2 = maybe_incr_stats([{QPid, 1}], case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end, State1),
- State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
+ maybe_incr_stats([{QPid, 1}], case AckRequired of
+ true -> deliver;
+ false -> deliver_no_ack
+ end, State1),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State1),
rabbit_trace:tap_trace_out(Msg, TraceState),
- noreply(State3#ch{next_tag = DeliveryTag + 1});
+ noreply(State1#ch{next_tag = DeliveryTag + 1});
handle_cast(force_event_refresh, State) ->
@@ -338,7 +338,7 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
rabbit_flow:receiver_down(QPid),
erase_queue_stats(QPid),
noreply(State3#ch{queue_monitors =
- dict:erase(QPid, State3#ch.queue_monitors)});
+ sets:del_element(QPid, State3#ch.queue_monitors)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -536,7 +536,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
#'channel.flow_ok'{active = false});
_ -> ok
end,
- demonitor_queue(QPid, State#ch{blocking = Blocking1})
+ State#ch{blocking = Blocking1}
end.
record_confirm(undefined, _, State) ->
@@ -574,8 +574,7 @@ remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs},
MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
case gb_sets:is_empty(MsgSeqNos1) of
true -> UQM1 = gb_trees:delete(QPid, UQM),
- demonitor_queue(
- QPid, State#ch{unconfirmed_qm = UQM1});
+ State#ch{unconfirmed_qm = UQM1};
false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
State#ch{unconfirmed_qm = UQM1}
end;
@@ -681,7 +680,8 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
State1 = State#ch{unacked_message_q = Remaining},
{noreply,
case TxStatus of
- none -> ack(Acked, State1);
+ none -> ack(Acked, State1),
+ State1;
in_progress -> State1#ch{uncommitted_acks =
Acked ++ State1#ch.uncommitted_acks}
end};
@@ -705,11 +705,11 @@ handle_method(#'basic.get'{queue = QueueNameBin,
State1 = lock_message(not(NoAck),
ack_record(DeliveryTag, none, Msg),
State),
- State2 = maybe_incr_stats([{QPid, 1}], case NoAck of
- true -> get_no_ack;
- false -> get
- end, State1),
- State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
+ maybe_incr_stats([{QPid, 1}], case NoAck of
+ true -> get_no_ack;
+ false -> get
+ end, State1),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State1),
rabbit_trace:tap_trace_out(Msg, TraceState),
ok = rabbit_writer:send_command(
WriterPid,
@@ -719,7 +719,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- {noreply, State3#ch{next_tag = DeliveryTag + 1}};
+ {noreply, State1#ch{next_tag = DeliveryTag + 1}};
empty ->
{reply, #'basic.get_empty'{}, State}
end;
@@ -796,9 +796,8 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
false -> dict:store(QPid, CTags1, QCons)
end
end,
- NewState = demonitor_queue(
- Q, State#ch{consumer_mapping = ConsumerMapping1,
- queue_consumers = QCons1}),
+ NewState = State#ch{consumer_mapping = ConsumerMapping1,
+ queue_consumers = QCons1},
%% In order to ensure that no more messages are sent to
%% the consumer after the cancel_ok has been sent, we get
%% the queue process to send the cancel_ok on our
@@ -1079,8 +1078,8 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
uncommitted_acks = TAL}) ->
- State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2,
- State, TMQ))),
+ ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ)),
+ State1 = new_tx(State),
{noreply, maybe_complete_tx(State1#ch{tx_status = committing})};
handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
@@ -1120,12 +1119,8 @@ handle_method(#'channel.flow'{active = false}, _,
ok = rabbit_limiter:block(Limiter1),
case consumer_queues(Consumers) of
[] -> {reply, #'channel.flow_ok'{active = false}, State1};
- QPids -> State2 = lists:foldl(fun monitor_queue/2,
- State1#ch{blocking =
- sets:from_list(QPids)},
- QPids),
- ok = rabbit_amqqueue:flush_all(QPids, self()),
- {noreply, State2}
+ QPids -> ok = rabbit_amqqueue:flush_all(QPids, self()),
+ {noreply, State1}
end;
handle_method(_MethodRecord, _Content, _State) ->
@@ -1154,16 +1149,9 @@ consumer_monitor(ConsumerTag,
end.
monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case not dict:is_key(QPid, QMons) of
- true -> MRef = erlang:monitor(process, QPid),
- State#ch{queue_monitors = dict:store(QPid, MRef, QMons)};
- false -> State
- end.
-
-demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case dict:is_key(QPid, QMons) of
- true -> true = erlang:demonitor(dict:fetch(QPid, QMons)),
- State#ch{queue_monitors = dict:erase(QPid, QMons)};
+ case not sets:is_element(QPid, QMons) of
+ true -> erlang:monitor(process, QPid),
+ State#ch{queue_monitors = sets:add_element(QPid, QMons)};
false -> State
end.
@@ -1357,24 +1345,32 @@ notify_limiter(Limiter, Acked) ->
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
+ mandatory = Mandatory,
+ immediate = Immediate,
msg_seq_no = MsgSeqNo},
QNames}, State) ->
{RoutingRes, DeliveredQPids} = rabbit_router:deliver(QNames, Delivery),
- State1 = process_routing_result(RoutingRes, DeliveredQPids,
- XName, MsgSeqNo, Message, State),
+ State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids),
+ case Mandatory orelse Immediate of
+ false -> [rabbit_flow:send(QPid) || QPid <- DeliveredQPids];
+ _ -> ok
+ end,
+ State2 = process_routing_result(RoutingRes, DeliveredQPids,
+ XName, MsgSeqNo, Message, State1),
maybe_incr_stats([{XName, 1} |
[{{QPid, XName}, 1} ||
- QPid <- DeliveredQPids]], publish, State1).
+ QPid <- DeliveredQPids]], publish, State2),
+ State2.
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
- record_confirm(MsgSeqNo, XName,
- maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
- return_unroutable, State));
+ maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
+ return_unroutable, State),
+ record_confirm(MsgSeqNo, XName, State);
process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_consumers),
- record_confirm(MsgSeqNo, XName,
- maybe_incr_stats([{XName, 1}], return_not_delivered, State));
+ maybe_incr_stats([{XName, 1}], return_not_delivered, State),
+ record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
@@ -1392,7 +1388,7 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
State0#ch{unconfirmed_qm = UQM1};
none ->
UQM1 = gb_trees:insert(QPid, SingletonSet, UQM),
- monitor_queue(QPid, State0#ch{unconfirmed_qm = UQM1})
+ State0#ch{unconfirmed_qm = UQM1}
end
end, State#ch{unconfirmed_mq = UMQ1}, QPids).
@@ -1416,13 +1412,12 @@ send_nacks(_, State) ->
send_confirms(State = #ch{tx_status = none, confirmed = []}) ->
State;
send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
- {MsgSeqNos, State1} =
- lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) ->
- {[MsgSeqNo | MSNs],
- maybe_incr_stats([{ExchangeName, 1}], confirm,
- State0)}
- end, {[], State}, lists:append(C)),
- send_confirms(MsgSeqNos, State1 #ch{confirmed = []});
+ MsgSeqNos =
+ lists:foldl(fun ({MsgSeqNo, XName}, MSNs) ->
+ maybe_incr_stats([{XName, 1}], confirm, State),
+ [MsgSeqNo | MSNs]
+ end, [], lists:append(C)),
+ send_confirms(MsgSeqNos, State#ch{confirmed = []});
send_confirms(State) ->
maybe_complete_tx(State).
@@ -1502,26 +1497,21 @@ i(Item, _) ->
maybe_incr_redeliver_stats(true, QPid, State) ->
maybe_incr_stats([{QPid, 1}], redeliver, State);
-maybe_incr_redeliver_stats(_, _, State) ->
- State.
+maybe_incr_redeliver_stats(_, _, _State) ->
+ ok.
maybe_incr_stats(QXIncs, Measure, State) ->
case rabbit_event:stats_level(State, #ch.stats_timer) of
- fine -> lists:foldl(fun ({QX, Inc}, State0) ->
- incr_stats(QX, Inc, Measure, State0)
- end, State, QXIncs);
- _ -> State
+ fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs];
+ _ -> ok
end.
-incr_stats({QPid, _} = QX, Inc, Measure, State) ->
- update_measures(queue_exchange_stats, QX, Inc, Measure),
- monitor_queue(QPid, State);
-incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) ->
- update_measures(queue_stats, QPid, Inc, Measure),
- monitor_queue(QPid, State);
-incr_stats(X, Inc, Measure, State) ->
- update_measures(exchange_stats, X, Inc, Measure),
- State.
+incr_stats({_, _} = QX, Inc, Measure) ->
+ update_measures(queue_exchange_stats, QX, Inc, Measure);
+incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
+ update_measures(queue_stats, QPid, Inc, Measure);
+incr_stats(X, Inc, Measure) ->
+ update_measures(exchange_stats, X, Inc, Measure).
update_measures(Type, QX, Inc, Measure) ->
Measures = case get({Type, QX}) of
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index fd6b1265..31f5ad14 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -59,7 +59,6 @@ deliver(QNames, Delivery = #delivery{mandatory = false,
%% is preserved. This scales much better than the non-immediate
%% case below.
QPids = lookup_qpids(QNames),
- [rabbit_flow:send(QPid) || QPid <- QPids],
delegate:invoke_no_result(
QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
{routed, QPids};