summaryrefslogtreecommitdiff
path: root/src/rabbit_channel.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-03-20 11:08:28 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-03-20 11:08:28 +0000
commit904c58e4fcce4938c6a8725f484e2b78813c6986 (patch)
treec7db7336a88d46ad5216ea5cee8a8b604701f9e1 /src/rabbit_channel.erl
parent964f43befe1a48e5256ee1bfb77bc6b7422e4c05 (diff)
parentd8baacd25557f3bee697dd867bd3c32e1d1ba874 (diff)
downloadrabbitmq-server-904c58e4fcce4938c6a8725f484e2b78813c6986.tar.gz
stable to default
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r--src/rabbit_channel.erl522
1 files changed, 284 insertions, 238 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a715b291..792a06c9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -33,10 +33,10 @@
-export([list_local/0]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- conn_name, limiter, tx_status, next_tag, unacked_message_q,
- uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
- virtual_host, most_recently_declared_queue, queue_monitors,
- consumer_mapping, blocking, queue_consumers, delivering_queues,
+ conn_name, limiter, tx, next_tag, unacked_message_q, user,
+ virtual_host, most_recently_declared_queue,
+ queue_names, queue_monitors, consumer_mapping,
+ blocking, queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
unconfirmed, confirmed, capabilities, trace_state}).
@@ -64,6 +64,12 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(INCR_STATS(Incs, Measure, State),
+ case rabbit_event:stats_level(State, #ch.stats_timer) of
+ fine -> incr_stats(Incs, Measure);
+ _ -> ok
+ end).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -185,15 +191,13 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
conn_pid = ConnPid,
conn_name = ConnName,
limiter = Limiter,
- tx_status = none,
+ tx = none,
next_tag = 1,
unacked_message_q = queue:new(),
- uncommitted_message_q = queue:new(),
- uncommitted_acks = [],
- uncommitted_nacks = [],
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
+ queue_names = dict:new(),
queue_monitors = pmon:new(),
consumer_mapping = dict:new(),
blocking = sets:new(),
@@ -314,9 +318,12 @@ handle_cast({deliver, ConsumerTag, AckRequired,
handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
noreply(State);
+
handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
- noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
+ Timeout = case C of [] -> hibernate; _ -> 0 end,
+ %% NB: don't call noreply/1 since we don't want to send confirms.
+ {noreply, ensure_stats_timer(State1), Timeout}.
handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
@@ -327,8 +334,10 @@ handle_info(timeout, State) ->
handle_info(emit_stats, State) ->
emit_stats(State),
- noreply([ensure_stats_timer],
- rabbit_event:reset_stats_timer(State, #ch.stats_timer));
+ State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer),
+ %% NB: don't call noreply/1 since we don't want to kick off the
+ %% stats timer.
+ {noreply, send_confirms(State1), hibernate};
handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
@@ -336,9 +345,13 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State3 = handle_consuming_queue_down(QPid, State2),
State4 = handle_delivering_queue_down(QPid, State3),
credit_flow:peer_down(QPid),
- erase_queue_stats(QPid),
- noreply(State4#ch{queue_monitors = pmon:erase(
- QPid, State4#ch.queue_monitors)});
+ #ch{queue_names = QNames, queue_monitors = QMons} = State4,
+ case dict:find(QPid, QNames) of
+ {ok, QName} -> erase_queue_stats(QName);
+ error -> ok
+ end,
+ noreply(State4#ch{queue_names = dict:erase(QPid, QNames),
+ queue_monitors = pmon:erase(QPid, QMons)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -368,30 +381,11 @@ format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
%%---------------------------------------------------------------------------
-reply(Reply, NewState) -> reply(Reply, [], NewState).
-
-reply(Reply, Mask, NewState) -> reply(Reply, Mask, NewState, hibernate).
-
-reply(Reply, Mask, NewState, Timeout) ->
- {reply, Reply, next_state(Mask, NewState), Timeout}.
+reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
-noreply(NewState) -> noreply([], NewState).
+noreply(NewState) -> {noreply, next_state(NewState), hibernate}.
-noreply(Mask, NewState) -> noreply(Mask, NewState, hibernate).
-
-noreply(Mask, NewState, Timeout) ->
- {noreply, next_state(Mask, NewState), Timeout}.
-
--define(MASKED_CALL(Fun, Mask, State),
- case lists:member(Fun, Mask) of
- true -> State;
- false -> Fun(State)
- end).
-
-next_state(Mask, State) ->
- State1 = ?MASKED_CALL(ensure_stats_timer, Mask, State),
- State2 = ?MASKED_CALL(send_confirms, Mask, State1),
- State2.
+next_state(State) -> ensure_stats_timer(send_confirms(State)).
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #ch.stats_timer, emit_stats).
@@ -425,8 +419,14 @@ handle_exception(Reason, State = #ch{protocol = Protocol,
{stop, normal, State1}
end.
+-ifdef(use_specs).
+-spec(precondition_failed/1 :: (string()) -> no_return()).
+-endif.
precondition_failed(Format) -> precondition_failed(Format, []).
+-ifdef(use_specs).
+-spec(precondition_failed/2 :: (string(), [any()]) -> no_return()).
+-endif.
precondition_failed(Format, Params) ->
rabbit_misc:protocol_error(precondition_failed, Format, Params).
@@ -443,15 +443,13 @@ check_resource_access(User, Resource, Perm) ->
undefined -> [];
Other -> Other
end,
- CacheTail =
- case lists:member(V, Cache) of
- true -> lists:delete(V, Cache);
- false -> ok = rabbit_access_control:check_resource_access(
- User, Resource, Perm),
- lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1)
- end,
- put(permission_cache, [V | CacheTail]),
- ok.
+ case lists:member(V, Cache) of
+ true -> ok;
+ false -> ok = rabbit_access_control:check_resource_access(
+ User, Resource, Perm),
+ CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1),
+ put(permission_cache, [V | CacheTail])
+ end.
clear_permission_cache() ->
erase(permission_cache),
@@ -530,16 +528,12 @@ check_not_default_exchange(_) ->
%% check that an exchange/queue name does not contain the reserved
%% "amq." prefix.
%%
-%% One, quite reasonable, interpretation of the spec, taken by the
-%% QPid M1 Java client, is that the exclusion of "amq." prefixed names
+%% As per the AMQP 0-9-1 spec, the exclusion of "amq." prefixed names
%% only applies on actual creation, and not in the cases where the
-%% entity already exists. This is how we use this function in the code
-%% below. However, AMQP JIRA 123 changes that in 0-10, and possibly
-%% 0-9SP1, making it illegal to attempt to declare an exchange/queue
-%% with an amq.* name when passive=false. So this will need
-%% revisiting.
+%% entity already exists or passive=true.
%%
-%% TODO: enforce other constraints on name. See AMQP JIRA 69.
+%% NB: We deliberately do not enforce the other constraints on names
+%% required by the spec.
check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
rabbit_misc:protocol_error(
access_refused,
@@ -550,18 +544,16 @@ check_name(_Kind, NameBin) ->
queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
case sets:is_element(QPid, Blocking) of
false -> State;
- true -> Blocking1 = sets:del_element(QPid, Blocking),
- case sets:size(Blocking1) of
- 0 -> ok = send(#'channel.flow_ok'{active = false}, State);
- _ -> ok
- end,
- State#ch{blocking = Blocking1}
+ true -> maybe_send_flow_ok(
+ State#ch{blocking = sets:del_element(QPid, Blocking)})
end.
-record_confirm(undefined, _, State) ->
- State;
-record_confirm(MsgSeqNo, XName, State) ->
- record_confirms([{MsgSeqNo, XName}], State).
+maybe_send_flow_ok(State = #ch{blocking = Blocking}) ->
+ case sets:size(Blocking) of
+ 0 -> ok = send(#'channel.flow_ok'{active = false}, State);
+ _ -> ok
+ end,
+ State.
record_confirms([], State) ->
State;
@@ -597,6 +589,15 @@ handle_method(_Method, _, State = #ch{state = closing}) ->
handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
{ok, State1} = notify_queues(State),
+ %% We issue the channel.close_ok response after a handshake with
+ %% the reader, the other half of which is ready_for_close. That
+ %% way the reader forgets about the channel before we send the
+ %% response (and this channel process terminates). If we didn't do
+ %% that, a channel.open for the same channel number, which a
+ %% client is entitled to send as soon as it has received the
+ %% close_ok, might be received by the reader before it has seen
+ %% the termination and hence be sent to the old, now dead/dying
+ %% channel process, instead of a new process, and thus lost.
ReaderPid ! {channel_closing, self()},
{noreply, State1};
@@ -604,8 +605,8 @@ handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
%% while waiting for the reply to a synchronous command, we generally
%% do allow this...except in the case of a pending tx.commit, where
%% it could wreak havoc.
-handle_method(_Method, _, #ch{tx_status = TxStatus})
- when TxStatus =/= none andalso TxStatus =/= in_progress ->
+handle_method(_Method, _, #ch{tx = Tx})
+ when Tx =:= committing orelse Tx =:= failed ->
rabbit_misc:protocol_error(
channel_error, "unexpected command while processing 'tx.commit'", []);
@@ -619,7 +620,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory},
Content, State = #ch{virtual_host = VHostPath,
- tx_status = TxStatus,
+ tx = Tx,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
@@ -633,23 +634,22 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
check_user_id_header(Props, State),
check_expiration_header(Props),
{MsgSeqNo, State1} =
- case {TxStatus, ConfirmEnabled} of
+ case {Tx, ConfirmEnabled} of
{none, false} -> {undefined, State};
{_, _} -> SeqNo = State#ch.publish_seqno,
{SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
- rabbit_trace:tap_trace_in(Message, TraceState),
+ rabbit_trace:tap_in(Message, TraceState),
Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
- {noreply,
- case TxStatus of
- none -> deliver_to_queues({Delivery, QNames}, State1);
- in_progress -> TMQ = State1#ch.uncommitted_message_q,
- NewTMQ = queue:in({Delivery, QNames}, TMQ),
- State1#ch{uncommitted_message_q = NewTMQ}
- end};
+ DQ = {Delivery, QNames},
+ {noreply, case Tx of
+ none -> deliver_to_queues(DQ, State1);
+ {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
+ State1#ch{tx = {Msgs1, Acks}}
+ end};
{error, Reason} ->
precondition_failed("invalid message: ~p", [Reason])
end;
@@ -662,16 +662,15 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
- _, State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
+ _, State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
- {noreply,
- case TxStatus of
- none -> ack(Acked, State1),
- State1;
- in_progress -> State1#ch{uncommitted_acks =
- Acked ++ State1#ch.uncommitted_acks}
- end};
+ {noreply, case Tx of
+ none -> ack(Acked, State1),
+ State1;
+ {Msgs, Acks} -> Acks1 = ack_cons(ack, Acked, Acks),
+ State1#ch{tx = {Msgs, Acks1}}
+ end};
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
@@ -684,7 +683,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, QPid, _MsgId, Redelivered,
+ Msg = {QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}}} ->
@@ -696,7 +695,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- State1 = monitor_delivering_queue(NoAck, QPid, State),
+ State1 = monitor_delivering_queue(NoAck, QPid, QName, State),
{noreply, record_sent(none, not(NoAck), Msg, State1)};
empty ->
{reply, #'basic.get_empty'{}, State}
@@ -735,10 +734,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ActualConsumerTag})),
Q}
end) of
- {ok, Q = #amqqueue{pid = QPid}} ->
+ {ok, Q = #amqqueue{pid = QPid, name = QName}} ->
CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping),
State1 = monitor_delivering_queue(
- NoAck, QPid, State#ch{consumer_mapping = CM1}),
+ NoAck, QPid, QName,
+ State#ch{consumer_mapping = CM1}),
{noreply,
case NoWait of
true -> consumer_monitor(ActualConsumerTag, State1);
@@ -822,14 +822,12 @@ handle_method(#'basic.recover_async'{requeue = true},
limiter = Limiter}) ->
OkFun = fun () -> ok end,
UAMQL = queue:to_list(UAMQ),
- ok = fold_per_queue(
- fun (QPid, MsgIds, ok) ->
- rabbit_misc:with_exit_handler(
- OkFun, fun () ->
- rabbit_amqqueue:requeue(
- QPid, MsgIds, self())
- end)
- end, ok, UAMQL),
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
+ rabbit_misc:with_exit_handler(
+ OkFun,
+ fun () -> rabbit_amqqueue:requeue(QPid, MsgIds, self()) end)
+ end, lists:reverse(UAMQL)),
ok = notify_limiter(Limiter, UAMQL),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
@@ -1044,34 +1042,34 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
precondition_failed("cannot switch from confirm to tx mode");
+handle_method(#'tx.select'{}, _, State = #ch{tx = none}) ->
+ {reply, #'tx.select_ok'{}, State#ch{tx = new_tx()}};
+
handle_method(#'tx.select'{}, _, State) ->
- {reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}};
+ {reply, #'tx.select_ok'{}, State};
-handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
+handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
-handle_method(#'tx.commit'{}, _,
- State = #ch{uncommitted_message_q = TMQ,
- uncommitted_acks = TAL,
- uncommitted_nacks = TNL,
- limiter = Limiter}) ->
- State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
- ack(TAL, State1),
- lists:foreach(
- fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, TNL),
- {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))};
-
-handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
+handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks},
+ limiter = Limiter}) ->
+ State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs),
+ lists:foreach(fun ({ack, A}) -> ack(A, State1);
+ ({Requeue, A}) -> reject(Requeue, A, Limiter)
+ end, lists:reverse(Acks)),
+ {noreply, maybe_complete_tx(State1#ch{tx = committing})};
+
+handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
precondition_failed("channel is not transactional");
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- uncommitted_acks = TAL,
- uncommitted_nacks = TNL}) ->
- TNL1 = lists:append([L || {_, L} <- TNL]),
- UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))),
- {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})};
+ tx = {_Msgs, Acks}}) ->
+ AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])),
+ UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))),
+ {reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1,
+ tx = new_tx()}};
-handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
+handle_method(#'confirm.select'{}, _, #ch{tx = {_, _}}) ->
precondition_failed("cannot switch from tx to confirm mode");
handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
@@ -1096,12 +1094,9 @@ handle_method(#'channel.flow'{active = false}, _,
end,
State1 = State#ch{limiter = Limiter1},
ok = rabbit_limiter:block(Limiter1),
- case consumer_queues(Consumers) of
- [] -> {reply, #'channel.flow_ok'{active = false}, State1};
- QPids -> State2 = State1#ch{blocking = sets:from_list(QPids)},
- ok = rabbit_amqqueue:flush_all(QPids, self()),
- {noreply, State2}
- end;
+ QPids = consumer_queues(Consumers),
+ ok = rabbit_amqqueue:flush_all(QPids, self()),
+ {noreply, maybe_send_flow_ok(State1#ch{blocking = sets:from_list(QPids)})};
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
@@ -1130,9 +1125,12 @@ consumer_monitor(ConsumerTag,
State
end.
-monitor_delivering_queue(NoAck, QPid, State = #ch{queue_monitors = QMons,
- delivering_queues = DQ}) ->
- State#ch{queue_monitors = pmon:monitor(QPid, QMons),
+monitor_delivering_queue(NoAck, QPid, QName,
+ State = #ch{queue_names = QNames,
+ queue_monitors = QMons,
+ delivering_queues = DQ}) ->
+ State#ch{queue_names = dict:store(QPid, QName, QNames),
+ queue_monitors = pmon:monitor(QPid, QMons),
delivering_queues = case NoAck of
true -> DQ;
false -> sets:add_element(QPid, DQ)
@@ -1196,6 +1194,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
not_found, "no binding ~s between ~s and ~s",
[RoutingKey, rabbit_misc:rs(ExchangeName),
rabbit_misc:rs(DestinationName)]);
+ {error, {binding_invalid, Fmt, Args}} ->
+ rabbit_misc:protocol_error(precondition_failed, Fmt, Args);
{error, #amqp_error{} = Error} ->
rabbit_misc:protocol_error(Error);
ok -> return_ok(State, NoWait, ReturnMethod)
@@ -1215,42 +1215,40 @@ basic_return(#basic_message{exchange_name = ExchangeName,
Content).
reject(DeliveryTag, Requeue, Multiple,
- State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
+ State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
State1 = State#ch{unacked_message_q = Remaining},
- {noreply,
- case TxStatus of
- none ->
- reject(Requeue, Acked, State1#ch.limiter),
- State1;
- in_progress ->
- State1#ch{uncommitted_nacks =
- [{Requeue, Acked} | State1#ch.uncommitted_nacks]}
- end}.
-
+ {noreply, case Tx of
+ none -> reject(Requeue, Acked, State1#ch.limiter),
+ State1;
+ {Msgs, Acks} -> Acks1 = ack_cons(Requeue, Acked, Acks),
+ State1#ch{tx = {Msgs, Acks1}}
+ end}.
+
+%% NB: Acked is in youngest-first order
reject(Requeue, Acked, Limiter) ->
- ok = fold_per_queue(
- fun (QPid, MsgIds, ok) ->
- rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
- end, ok, Acked),
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
+ rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
+ end, Acked),
ok = notify_limiter(Limiter, Acked).
record_sent(ConsumerTag, AckRequired,
- Msg = {_QName, QPid, MsgId, Redelivered, _Message},
+ Msg = {QName, QPid, MsgId, Redelivered, _Message},
State = #ch{unacked_message_q = UAMQ,
next_tag = DeliveryTag,
trace_state = TraceState}) ->
- incr_stats([{queue_stats, QPid, 1}], case {ConsumerTag, AckRequired} of
- {none, true} -> get;
- {none, false} -> get_no_ack;
- {_ , true} -> deliver;
- {_ , false} -> deliver_no_ack
- end, State),
+ ?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of
+ {none, true} -> get;
+ {none, false} -> get_no_ack;
+ {_ , true} -> deliver;
+ {_ , false} -> deliver_no_ack
+ end, State),
case Redelivered of
- true -> incr_stats([{queue_stats, QPid, 1}], redeliver, State);
+ true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State);
false -> ok
end,
- rabbit_trace:tap_trace_out(Msg, TraceState),
+ rabbit_trace:tap_out(Msg, TraceState),
UAMQ1 = case AckRequired of
true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
UAMQ);
@@ -1258,40 +1256,61 @@ record_sent(ConsumerTag, AckRequired,
end,
State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
+%% NB: returns acks in youngest-first order
collect_acks(Q, 0, true) ->
- {queue:to_list(Q), queue:new()};
+ {lists:reverse(queue:to_list(Q)), queue:new()};
collect_acks(Q, DeliveryTag, Multiple) ->
- collect_acks([], queue:new(), Q, DeliveryTag, Multiple).
+ collect_acks([], [], Q, DeliveryTag, Multiple).
collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
case queue:out(Q) of
{{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
QTail} ->
if CurrentDeliveryTag == DeliveryTag ->
- {[UnackedMsg | ToAcc], queue:join(PrefixAcc, QTail)};
+ {[UnackedMsg | ToAcc],
+ case PrefixAcc of
+ [] -> QTail;
+ _ -> queue:join(
+ queue:from_list(lists:reverse(PrefixAcc)),
+ QTail)
+ end};
Multiple ->
collect_acks([UnackedMsg | ToAcc], PrefixAcc,
QTail, DeliveryTag, Multiple);
true ->
- collect_acks(ToAcc, queue:in(UnackedMsg, PrefixAcc),
+ collect_acks(ToAcc, [UnackedMsg | PrefixAcc],
QTail, DeliveryTag, Multiple)
end;
{empty, _} ->
precondition_failed("unknown delivery tag ~w", [DeliveryTag])
end.
-ack(Acked, State) ->
- Incs = fold_per_queue(
- fun (QPid, MsgIds, L) ->
- ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
- [{queue_stats, QPid, length(MsgIds)} | L]
- end, [], Acked),
- ok = notify_limiter(State#ch.limiter, Acked),
- incr_stats(Incs, ack, State).
-
-new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
- uncommitted_acks = [],
- uncommitted_nacks = []}.
+%% NB: Acked is in youngest-first order
+ack(Acked, State = #ch{queue_names = QNames}) ->
+ foreach_per_queue(
+ fun (QPid, MsgIds) ->
+ ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
+ ?INCR_STATS(case dict:find(QPid, QNames) of
+ {ok, QName} -> Count = length(MsgIds),
+ [{queue_stats, QName, Count}];
+ error -> []
+ end, ack, State)
+ end, Acked),
+ ok = notify_limiter(State#ch.limiter, Acked).
+
+%% {Msgs, Acks}
+%%
+%% Msgs is a queue.
+%%
+%% Acks looks s.t. like this:
+%% [{false,[5,4]},{true,[3]},{ack,[2,1]}, ...]
+%%
+%% Each element is a pair consisting of a tag and a list of
+%% ack'ed/reject'ed msg ids. The tag is one of 'ack' (to ack), 'true'
+%% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as
+%% well as the list overall, are in "most-recent (generally youngest)
+%% ack first" order.
+new_tx() -> {queue:new(), []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1301,15 +1320,17 @@ notify_queues(State = #ch{consumer_mapping = Consumers,
sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
{rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}.
-fold_per_queue(_F, Acc, []) ->
- Acc;
-fold_per_queue(F, Acc, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
- F(QPid, [MsgId], Acc);
-fold_per_queue(F, Acc, UAL) ->
+foreach_per_queue(_F, []) ->
+ ok;
+foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
+ F(QPid, [MsgId]);
+%% NB: UAL should be in youngest-first order; the tree values will
+%% then be in oldest-first order
+foreach_per_queue(F, UAL) ->
T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
rabbit_misc:gb_trees_cons(QPid, MsgId, T)
end, gb_trees:empty(), UAL),
- rabbit_misc:gb_trees_fold(F, Acc, T).
+ rabbit_misc:gb_trees_foreach(F, T).
enable_limiter(State = #ch{unacked_message_q = UAMQ,
limiter = Limiter}) ->
@@ -1332,65 +1353,93 @@ notify_limiter(Limiter, Acked) ->
case rabbit_limiter:is_enabled(Limiter) of
false -> ok;
true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
- ({_, _, _}, Acc) -> Acc + 1
+ ({_, _, _}, Acc) -> Acc + 1
end, 0, Acked) of
0 -> ok;
Count -> rabbit_limiter:ack(Limiter, Count)
end
end.
+deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
+ msg_seq_no = undefined,
+ mandatory = false},
+ []}, State) -> %% optimisation
+ ?INCR_STATS([{exchange_stats, XName, 1}], publish, State),
+ State;
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
msg_seq_no = MsgSeqNo},
- QNames}, State) ->
- {RoutingRes, DeliveredQPids} =
- rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery),
- State1 = State#ch{queue_monitors =
- pmon:monitor_all(DeliveredQPids,
- State#ch.queue_monitors)},
- State2 = process_routing_result(RoutingRes, DeliveredQPids,
- XName, MsgSeqNo, Message, State1),
- incr_stats([{exchange_stats, XName, 1} |
- [{queue_exchange_stats, {QPid, XName}, 1} ||
- QPid <- DeliveredQPids]], publish, State2),
- State2.
+ DelQNames}, State = #ch{queue_names = QNames,
+ queue_monitors = QMons}) ->
+ Qs = rabbit_amqqueue:lookup(DelQNames),
+ {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver_flow(Qs, Delivery),
+ %% The pmon:monitor_all/2 monitors all queues to which we
+ %% delivered. But we want to monitor even queues we didn't deliver
+ %% to, since we need their 'DOWN' messages to clean
+ %% queue_names. So we also need to monitor each QPid from
+ %% queues. But that only gets the masters (which is fine for
+ %% cleaning queue_names), so we need the union of both.
+ %%
+ %% ...and we need to add even non-delivered queues to queue_names
+ %% since alternative algorithms to update queue_names less
+ %% frequently would in fact be more expensive in the common case.
+ {QNames1, QMons1} =
+ lists:foldl(fun (#amqqueue{pid = QPid, name = QName},
+ {QNames0, QMons0}) ->
+ {case dict:is_key(QPid, QNames0) of
+ true -> QNames0;
+ false -> dict:store(QPid, QName, QNames0)
+ end, pmon:monitor(QPid, QMons0)}
+ end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs),
+ State1 = process_routing_result(RoutingRes, DeliveredQPids,
+ XName, MsgSeqNo, Message,
+ State#ch{queue_names = QNames1,
+ queue_monitors = QMons1}),
+ ?INCR_STATS([{exchange_stats, XName, 1} |
+ [{queue_exchange_stats, {QName, XName}, 1} ||
+ QPid <- DeliveredQPids,
+ {ok, QName} <- [dict:find(QPid, QNames1)]]],
+ publish, State1),
+ State1.
-process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
- ok = basic_return(Msg, State, no_route),
- incr_stats([{exchange_stats, Msg#basic_message.exchange_name, 1}],
- return_unroutable, State),
- record_confirm(MsgSeqNo, XName, State);
-process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
- record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
State;
+process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
+ record_confirms([{MsgSeqNo, XName}], State);
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
- State#ch.unconfirmed)}.
+ State#ch.unconfirmed)};
+process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
+ ok = basic_return(Msg, State, no_route),
+ ?INCR_STATS([{exchange_stats, XName, 1}], return_unroutable, State),
+ case MsgSeqNo of
+ undefined -> State;
+ _ -> record_confirms([{MsgSeqNo, XName}], State)
+ end.
send_nacks([], State) ->
State;
-send_nacks(_MXs, State = #ch{state = closing,
- tx_status = none}) -> %% optimisation
+send_nacks(_MXs, State = #ch{state = closing,
+ tx = none}) -> %% optimisation
State;
-send_nacks(MXs, State = #ch{tx_status = none}) ->
+send_nacks(MXs, State = #ch{tx = none}) ->
coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs],
fun(MsgSeqNo, Multiple) ->
#'basic.nack'{delivery_tag = MsgSeqNo,
multiple = Multiple}
end, State);
send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation
- State#ch{tx_status = failed};
+ State#ch{tx = failed};
send_nacks(_, State) ->
- maybe_complete_tx(State#ch{tx_status = failed}).
+ maybe_complete_tx(State#ch{tx = failed}).
-send_confirms(State = #ch{tx_status = none, confirmed = []}) ->
+send_confirms(State = #ch{tx = none, confirmed = []}) ->
State;
-send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
+send_confirms(State = #ch{tx = none, confirmed = C}) ->
MsgSeqNos =
lists:foldl(
fun ({MsgSeqNo, XName}, MSNs) ->
- incr_stats([{exchange_stats, XName, 1}], confirm, State),
+ ?INCR_STATS([{exchange_stats, XName, 1}], confirm, State),
[MsgSeqNo | MSNs]
end, [], lists:append(C)),
send_confirms(MsgSeqNos, State#ch{confirmed = []});
@@ -1424,7 +1473,12 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) ->
[ok = send(MkMsgFun(SeqNo, false), State) || SeqNo <- Ss],
State.
-maybe_complete_tx(State = #ch{tx_status = in_progress}) ->
+ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, Acked ++ Acks} | L];
+ack_cons(Tag, Acked, Acks) -> [{Tag, Acked} | Acks].
+
+ack_len(Acks) -> lists:sum([length(L) || {ack, L} <- Acks]).
+
+maybe_complete_tx(State = #ch{tx = {_, _}}) ->
State;
maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
case dtree:is_empty(UC) of
@@ -1432,16 +1486,16 @@ maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
true -> complete_tx(State#ch{confirmed = []})
end.
-complete_tx(State = #ch{tx_status = committing}) ->
+complete_tx(State = #ch{tx = committing}) ->
ok = send(#'tx.commit_ok'{}, State),
- State#ch{tx_status = in_progress};
-complete_tx(State = #ch{tx_status = failed}) ->
+ State#ch{tx = new_tx()};
+complete_tx(State = #ch{tx = failed}) ->
{noreply, State1} = handle_exception(
rabbit_misc:amqp_error(
precondition_failed, "partial tx completion", [],
'tx.commit'),
State),
- State1#ch{tx_status = in_progress}.
+ State1#ch{tx = new_tx()}.
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
@@ -1450,19 +1504,16 @@ i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
i(number, #ch{channel = Channel}) -> Channel;
i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
-i(transactional, #ch{tx_status = TE}) -> TE =/= none;
+i(transactional, #ch{tx = Tx}) -> Tx =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(name, State) -> name(State);
-i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
- dict:size(ConsumerMapping);
-i(messages_unconfirmed, #ch{unconfirmed = UC}) ->
- dtree:size(UC);
-i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
- queue:len(UAMQ);
-i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->
- queue:len(TMQ);
-i(acks_uncommitted, #ch{uncommitted_acks = TAL}) ->
- length(TAL);
+i(consumer_count, #ch{consumer_mapping = CM}) -> dict:size(CM);
+i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
+i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ);
+i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs);
+i(messages_uncommitted, #ch{}) -> 0;
+i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
+i(acks_uncommitted, #ch{}) -> 0;
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_limit(Limiter);
i(client_flow_blocked, #ch{limiter = Limiter}) ->
@@ -1473,12 +1524,8 @@ i(Item, _) ->
name(#ch{conn_name = ConnName, channel = Channel}) ->
list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])).
-incr_stats(Incs, Measure, State) ->
- case rabbit_event:stats_level(State, #ch.stats_timer) of
- fine -> [update_measures(Type, Key, Inc, Measure) ||
- {Type, Key, Inc} <- Incs];
- _ -> ok
- end.
+incr_stats(Incs, Measure) ->
+ [update_measures(Type, Key, Inc, Measure) || {Type, Key, Inc} <- Incs].
update_measures(Type, Key, Inc, Measure) ->
Measures = case get({Type, Key}) of
@@ -1495,24 +1542,23 @@ emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
- CoarseStats = infos(?STATISTICS_KEYS, State),
+ Coarse = infos(?STATISTICS_KEYS, State),
case rabbit_event:stats_level(State, #ch.stats_timer) of
- coarse ->
- rabbit_event:notify(channel_stats, Extra ++ CoarseStats);
- fine ->
- FineStats =
- [{channel_queue_stats,
- [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]},
- {channel_exchange_stats,
- [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]},
- {channel_queue_exchange_stats,
- [{QX, Stats} ||
- {{queue_exchange_stats, QX}, Stats} <- get()]}],
- rabbit_event:notify(channel_stats,
- Extra ++ CoarseStats ++ FineStats)
+ coarse -> rabbit_event:notify(channel_stats, Extra ++ Coarse);
+ fine -> Fine = [{channel_queue_stats,
+ [{QName, Stats} ||
+ {{queue_stats, QName}, Stats} <- get()]},
+ {channel_exchange_stats,
+ [{XName, Stats} ||
+ {{exchange_stats, XName}, Stats} <- get()]},
+ {channel_queue_exchange_stats,
+ [{QX, Stats} ||
+ {{queue_exchange_stats, QX}, Stats} <- get()]}],
+ rabbit_event:notify(channel_stats, Extra ++ Coarse ++ Fine)
end.
-erase_queue_stats(QPid) ->
- erase({queue_stats, QPid}),
+erase_queue_stats(QName) ->
+ erase({queue_stats, QName}),
[erase({queue_exchange_stats, QX}) ||
- {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
+ {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(),
+ QName0 =:= QName].