summaryrefslogtreecommitdiff
path: root/src/rabbit_channel.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r--src/rabbit_channel.erl719
1 files changed, 423 insertions, 296 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index eb80e437..6fbbc93e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -20,21 +20,22 @@
-behaviour(gen_server2).
--export([start_link/7, do/2, do/3, flush/1, shutdown/1]).
+-export([start_link/10, do/2, do/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
--export([emit_stats/1]).
+-export([refresh_config_all/0, emit_stats/1, ready_for_close/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2]).
+ prioritise_cast/2, format_message_queue/2]).
--record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
- start_limiter_fun, transaction_id, tx_participants, next_tag,
- uncommitted_ack_q, unacked_message_q,
+-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
+ limiter_pid, start_limiter_fun, tx_status, next_tag,
+ unacked_message_q, uncommitted_message_q, uncommitted_ack_q,
user, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, unconfirmed, confirmed}).
+ consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
+ stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
+ unconfirmed_qm, confirmed, capabilities, trace_state}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -45,6 +46,7 @@
consumer_count,
messages_unacknowledged,
messages_unconfirmed,
+ messages_uncommitted,
acks_uncommitted,
prefetch_count,
client_flow_blocked]).
@@ -66,10 +68,10 @@
-type(channel_number() :: non_neg_integer()).
--spec(start_link/7 ::
- (channel_number(), pid(), pid(), rabbit_types:user(),
- rabbit_types:vhost(), pid(),
- fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
+-spec(start_link/10 ::
+ (channel_number(), pid(), pid(), pid(), rabbit_types:protocol(),
+ rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
+ pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
@@ -88,16 +90,19 @@
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
-spec(info_all/0 :: () -> [rabbit_types:infos()]).
-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
+-spec(refresh_config_all/0 :: () -> 'ok').
-spec(emit_stats/1 :: (pid()) -> 'ok').
+-spec(ready_for_close/1 :: (pid()) -> 'ok').
-endif.
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
- StartLimiterFun) ->
- gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, User,
- VHost, CollectorPid, StartLimiterFun], []).
+start_link(Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
+ Capabilities, CollectorPid, StartLimiterFun) ->
+ gen_server2:start_link(
+ ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, Protocol, User,
+ VHost, Capabilities, CollectorPid, StartLimiterFun], []).
do(Pid, Method) ->
do(Pid, Method, none).
@@ -106,7 +111,7 @@ do(Pid, Method, Content) ->
gen_server2:cast(Pid, {method, Method, Content}).
flush(Pid) ->
- gen_server2:call(Pid, flush).
+ gen_server2:call(Pid, flush, infinity).
shutdown(Pid) ->
gen_server2:cast(Pid, terminate).
@@ -143,38 +148,52 @@ info_all() ->
info_all(Items) ->
rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
+refresh_config_all() ->
+ rabbit_misc:upmap(
+ fun (C) -> gen_server2:call(C, refresh_config) end, list()),
+ ok.
+
emit_stats(Pid) ->
gen_server2:cast(Pid, emit_stats).
+ready_for_close(Pid) ->
+ gen_server2:cast(Pid, ready_for_close).
+
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
- StartLimiterFun]) ->
+init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
+ Capabilities, CollectorPid, StartLimiterFun]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
StatsTimer = rabbit_event:init_stats_timer(),
State = #ch{state = starting,
+ protocol = Protocol,
channel = Channel,
reader_pid = ReaderPid,
writer_pid = WriterPid,
+ conn_pid = ConnPid,
limiter_pid = undefined,
start_limiter_fun = StartLimiterFun,
- transaction_id = none,
- tx_participants = sets:new(),
+ tx_status = none,
next_tag = 1,
- uncommitted_ack_q = queue:new(),
unacked_message_q = queue:new(),
+ uncommitted_message_q = queue:new(),
+ uncommitted_ack_q = queue:new(),
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
blocking = dict:new(),
+ consumer_monitors = dict:new(),
queue_collector_pid = CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
publish_seqno = 1,
- unconfirmed = gb_trees:empty(),
- confirmed = []},
+ unconfirmed_mq = gb_trees:empty(),
+ unconfirmed_qm = gb_trees:empty(),
+ confirmed = [],
+ capabilities = Capabilities,
+ trace_state = rabbit_trace:init(VHost)},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State) end),
@@ -207,6 +226,9 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
+handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
+ reply(ok, State#ch{trace_state = rabbit_trace:init(VHost)});
+
handle_call(_Request, _From, State) ->
noreply(State).
@@ -218,14 +240,11 @@ handle_cast({method, Method, Content}, State) ->
{noreply, NewState} ->
noreply(NewState);
stop ->
- {stop, normal, State#ch{state = terminating}}
+ {stop, normal, State}
catch
exit:Reason = #amqp_error{} ->
MethodName = rabbit_misc:method_record_type(Method),
- {stop, normal, terminating(Reason#amqp_error{method = MethodName},
- State)};
- exit:normal ->
- {stop, normal, State};
+ send_exception(Reason#amqp_error{method = MethodName}, State);
_:Reason ->
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
@@ -233,9 +252,19 @@ handle_cast({method, Method, Content}, State) ->
handle_cast({flushed, QPid}, State) ->
{noreply, queue_blocked(QPid, State), hibernate};
+handle_cast(ready_for_close, State = #ch{state = closing,
+ writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
+ {stop, normal, State};
+
handle_cast(terminate, State) ->
{stop, normal, State};
+handle_cast({command, #'basic.consume_ok'{consumer_tag = ConsumerTag} = Msg},
+ State = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(WriterPid, Msg),
+ noreply(monitor_consumer(ConsumerTag, State));
+
handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
noreply(State);
@@ -243,10 +272,11 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
handle_cast({deliver, ConsumerTag, AckRequired,
Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
+ routing_keys = [RoutingKey | _CcRoutes],
content = Content}}},
- State = #ch{writer_pid = WriterPid,
- next_tag = DeliveryTag}) ->
+ State = #ch{writer_pid = WriterPid,
+ next_tag = DeliveryTag,
+ trace_state = TraceState}) ->
State1 = lock_message(AckRequired,
ack_record(DeliveryTag, ConsumerTag, Msg),
State),
@@ -257,12 +287,12 @@ handle_cast({deliver, ConsumerTag, AckRequired,
exchange = ExchangeName#resource.name,
routing_key = RoutingKey},
rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
-
- maybe_incr_stats([{QPid, 1}],
- case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end, State),
+ maybe_incr_stats([{QPid, 1}], case AckRequired of
+ true -> deliver;
+ false -> deliver_no_ack
+ end, State),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State),
+ rabbit_trace:tap_trace_out(Msg, TraceState),
noreply(State1#ch{next_tag = DeliveryTag + 1});
handle_cast({confirm, MsgSeqNos, From}, State) ->
@@ -278,20 +308,18 @@ handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
State#ch{
stats_timer = rabbit_event:reset_stats_timer(StatsTimer)});
-handle_info({'DOWN', _MRef, process, QPid, Reason},
- State = #ch{unconfirmed = UC}) ->
- %% TODO: this does a complete scan and partial rebuild of the
- %% tree, which is quite efficient. To do better we'd need to
- %% maintain a secondary mapping, from QPids to MsgSeqNos.
- {MXs, UC1} = remove_queue_unconfirmed(
- gb_trees:next(gb_trees:iterator(UC)), QPid,
- {[], UC}, State),
- erase_queue_stats(QPid),
- State1 = case Reason of
- normal -> record_confirms(MXs, State#ch{unconfirmed = UC1});
- _ -> send_nacks(MXs, State#ch{unconfirmed = UC1})
- end,
- noreply(queue_blocked(QPid, State1)).
+handle_info({'DOWN', MRef, process, QPid, Reason},
+ State = #ch{consumer_monitors = ConsumerMonitors}) ->
+ noreply(
+ case dict:find(MRef, ConsumerMonitors) of
+ error ->
+ handle_publishing_queue_down(QPid, Reason, State);
+ {ok, ConsumerTag} ->
+ handle_consuming_queue_down(MRef, ConsumerTag, State)
+ end);
+
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State}.
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
@@ -303,22 +331,22 @@ handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer),
{hibernate, State#ch{stats_timer = StatsTimer1}}.
-terminate(_Reason, State = #ch{state = terminating}) ->
- terminate(State);
-
terminate(Reason, State) ->
- Res = rollback_and_notify(State),
+ {Res, _State1} = notify_queues(State),
case Reason of
normal -> ok = Res;
shutdown -> ok = Res;
{shutdown, _Term} -> ok = Res;
_ -> ok
end,
- terminate(State).
+ pg_local:leave(rabbit_channels, self()),
+ rabbit_event:notify(channel_closed, [{pid, self()}]).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+
%%---------------------------------------------------------------------------
reply(Reply, NewState) -> reply(Reply, [], NewState).
@@ -351,10 +379,23 @@ return_ok(State, false, Msg) -> {reply, Msg, State}.
ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
-terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) ->
- ok = rollback_and_notify(State),
- Reader ! {channel_exit, Channel, Reason},
- State#ch{state = terminating}.
+send_exception(Reason, State = #ch{protocol = Protocol,
+ channel = Channel,
+ writer_pid = WriterPid,
+ reader_pid = ReaderPid,
+ conn_pid = ConnPid}) ->
+ {CloseChannel, CloseMethod} =
+ rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
+ rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
+ [ConnPid, Channel, Reason]),
+ %% something bad's happened: notify_queues may not be 'ok'
+ {_Result, State1} = notify_queues(State),
+ case CloseChannel of
+ Channel -> ok = rabbit_writer:send_command(WriterPid, CloseMethod),
+ {noreply, State1};
+ _ -> ReaderPid ! {channel_exit, Channel, Reason},
+ {stop, normal, State1}
+ end.
return_queue_declare_ok(#resource{name = ActualName},
NoWait, MessageCount, ConsumerCount, State) ->
@@ -476,13 +517,6 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-remove_queue_unconfirmed(none, _QPid, Acc, _State) ->
- Acc;
-remove_queue_unconfirmed({MsgSeqNo, XQ, Next}, QPid, Acc, State) ->
- remove_queue_unconfirmed(gb_trees:next(Next), QPid,
- remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State),
- State).
-
record_confirm(undefined, _, State) ->
State;
record_confirm(MsgSeqNo, XName, State) ->
@@ -495,25 +529,42 @@ record_confirms(MXs, State = #ch{confirmed = C}) ->
confirm([], _QPid, State) ->
State;
-confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
- {MXs, UC1} =
+confirm(MsgSeqNos, QPid, State) ->
+ {MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State),
+ record_confirms(MXs, State1).
+
+process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ,
+ unconfirmed_qm = UQM}) ->
+ {MXs, UMQ1, UQM1} =
lists:foldl(
- fun(MsgSeqNo, {_DMs, UC0} = Acc) ->
- case gb_trees:lookup(MsgSeqNo, UC0) of
- none -> Acc;
- {value, XQ} -> remove_qmsg(MsgSeqNo, QPid, XQ, Acc, State)
+ fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) ->
+ case gb_trees:lookup(MsgSeqNo, UMQ0) of
+ {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
+ Acc, Nack);
+ none -> Acc
end
- end, {[], UC}, MsgSeqNos),
- record_confirms(MXs, State#ch{unconfirmed = UC1}).
-
-remove_qmsg(MsgSeqNo, QPid, {XName, Qs}, {MXs, UC}, State) ->
- Qs1 = sets:del_element(QPid, Qs),
- %% these confirms will be emitted even when a queue dies, but that
- %% should be fine, since the queue stats get erased immediately
- maybe_incr_stats([{{QPid, XName}, 1}], confirm, State),
- case sets:size(Qs1) of
- 0 -> {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UC)};
- _ -> {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UC)}
+ end, {[], UMQ, UQM}, MsgSeqNos),
+ {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}.
+
+remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack) ->
+ UQM1 = case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSeqNos} ->
+ MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
+ case gb_sets:is_empty(MsgSeqNos1) of
+ true -> gb_trees:delete(QPid, UQM);
+ false -> gb_trees:update(QPid, MsgSeqNos1, UQM)
+ end;
+ none ->
+ UQM
+ end,
+ Qs1 = gb_sets:del_element(QPid, Qs),
+ %% If QPid somehow died initiating a nack, clear the message from
+ %% internal data-structures. Also, cleanup empty entries.
+ case (Nack orelse gb_sets:is_empty(Qs1)) of
+ true ->
+ {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1};
+ false ->
+ {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1}
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -526,11 +577,29 @@ handle_method(#'channel.open'{}, _, _State) ->
handle_method(_Method, _, #ch{state = starting}) ->
rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []);
-handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
- ok = rollback_and_notify(State),
- ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
+handle_method(#'channel.close_ok'{}, _, #ch{state = closing}) ->
stop;
+handle_method(#'channel.close'{}, _, State = #ch{state = closing}) ->
+ {reply, #'channel.close_ok'{}, State};
+
+handle_method(_Method, _, State = #ch{state = closing}) ->
+ {noreply, State};
+
+handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
+ {ok, State1} = notify_queues(State),
+ ReaderPid ! {channel_closing, self()},
+ {noreply, State1};
+
+%% Even though the spec prohibits the client from sending commands
+%% while waiting for the reply to a synchronous command, we generally
+%% do allow this...except in the case of a pending tx.commit, where
+%% it could wreak havoc.
+handle_method(_Method, _, #ch{tx_status = TxStatus})
+ when TxStatus =/= none andalso TxStatus =/= in_progress ->
+ rabbit_misc:protocol_error(
+ channel_error, "unexpected command while processing 'tx.commit'", []);
+
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
@@ -539,8 +608,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
mandatory = Mandatory,
immediate = Immediate},
Content, State = #ch{virtual_host = VHostPath,
- transaction_id = TxnKey,
- confirm_enabled = ConfirmEnabled}) ->
+ tx_status = TxStatus,
+ confirm_enabled = ConfirmEnabled,
+ trace_state = TraceState}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -549,32 +619,29 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
check_user_id_header(DecodedContent#content.properties, State),
- IsPersistent = is_message_persistent(DecodedContent),
{MsgSeqNo, State1} =
- case ConfirmEnabled of
- false -> {undefined, State};
- true -> SeqNo = State#ch.publish_seqno,
- {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
+ case {TxStatus, ConfirmEnabled} of
+ {none, false} -> {undefined, State};
+ {_, _} -> SeqNo = State#ch.publish_seqno,
+ {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
- Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = DecodedContent,
- guid = rabbit_guid:guid(),
- is_persistent = IsPersistent},
- {RoutingRes, DeliveredQPids} =
- rabbit_exchange:publish(
- Exchange,
- rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
- MsgSeqNo)),
- State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName,
- MsgSeqNo, Message, State1),
- maybe_incr_stats([{ExchangeName, 1} |
- [{{QPid, ExchangeName}, 1} ||
- QPid <- DeliveredQPids]], publish, State2),
- {noreply, case TxnKey of
- none -> State2;
- _ -> add_tx_participants(DeliveredQPids, State2)
- end};
+ case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
+ {ok, Message} ->
+ rabbit_trace:tap_trace_in(Message, TraceState),
+ Delivery = rabbit_basic:delivery(Mandatory, Immediate, Message,
+ MsgSeqNo),
+ QNames = rabbit_exchange:route(Exchange, Delivery),
+ {noreply,
+ case TxStatus of
+ none -> deliver_to_queues({Delivery, QNames}, State1);
+ in_progress -> TMQ = State1#ch.uncommitted_message_q,
+ NewTMQ = queue:in({Delivery, QNames}, TMQ),
+ State1#ch{uncommitted_message_q = NewTMQ}
+ end};
+ {error, Reason} ->
+ rabbit_misc:protocol_error(precondition_failed,
+ "invalid message: ~p", [Reason])
+ end;
handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
multiple = Multiple,
@@ -584,46 +651,42 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
- _, State = #ch{transaction_id = TxnKey,
- unacked_message_q = UAMQ}) ->
+ _, State = #ch{unacked_message_q = UAMQ,
+ tx_status = TxStatus}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
- QIncs = ack(TxnKey, Acked),
- Participants = [QPid || {QPid, _} <- QIncs],
- maybe_incr_stats(QIncs, ack, State),
- {noreply, case TxnKey of
- none -> ok = notify_limiter(State#ch.limiter_pid, Acked),
- State#ch{unacked_message_q = Remaining};
- _ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q,
- Acked),
- add_tx_participants(
- Participants,
- State#ch{unacked_message_q = Remaining,
- uncommitted_ack_q = NewUAQ})
- end};
+ State1 = State#ch{unacked_message_q = Remaining},
+ {noreply,
+ case TxStatus of
+ none -> ack(Acked, State1);
+ in_progress -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q, Acked),
+ State1#ch{uncommitted_ack_q = NewTAQ}
+ end};
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
- _, State = #ch{writer_pid = WriterPid,
- reader_pid = ReaderPid,
- next_tag = DeliveryTag}) ->
+ _, State = #ch{writer_pid = WriterPid,
+ conn_pid = ConnPid,
+ next_tag = DeliveryTag,
+ trace_state = TraceState}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ReaderPid,
+ QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
+ routing_keys = [RoutingKey | _CcRoutes],
content = Content}}} ->
State1 = lock_message(not(NoAck),
ack_record(DeliveryTag, none, Msg),
State),
- maybe_incr_stats([{QPid, 1}],
- case NoAck of
- true -> get_no_ack;
- false -> get
- end, State),
+ maybe_incr_stats([{QPid, 1}], case NoAck of
+ true -> get_no_ack;
+ false -> get
+ end, State),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State),
+ rabbit_trace:tap_trace_out(Msg, TraceState),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
@@ -643,9 +706,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait},
- _, State = #ch{reader_pid = ReaderPid,
- limiter_pid = LimiterPid,
- consumer_mapping = ConsumerMapping }) ->
+ _, State = #ch{conn_pid = ConnPid,
+ limiter_pid = LimiterPid,
+ consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
@@ -660,20 +723,26 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
%% behalf. This is for symmetry with basic.cancel - see
%% the comment in that method for why.
case rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ReaderPid,
+ QueueName, ConnPid,
fun (Q) ->
- rabbit_amqqueue:basic_consume(
- Q, NoAck, self(), LimiterPid,
- ActualConsumerTag, ExclusiveConsume,
- ok_msg(NoWait, #'basic.consume_ok'{
- consumer_tag = ActualConsumerTag}))
+ {rabbit_amqqueue:basic_consume(
+ Q, NoAck, self(), LimiterPid,
+ ActualConsumerTag, ExclusiveConsume,
+ ok_msg(NoWait, #'basic.consume_ok'{
+ consumer_tag = ActualConsumerTag})),
+ Q}
end) of
- ok ->
- {noreply, State#ch{consumer_mapping =
- dict:store(ActualConsumerTag,
- QueueName,
- ConsumerMapping)}};
- {error, exclusive_consume_unavailable} ->
+ {ok, Q} ->
+ State1 = State#ch{consumer_mapping =
+ dict:store(ActualConsumerTag,
+ {Q, undefined},
+ ConsumerMapping)},
+ {noreply,
+ case NoWait of
+ true -> monitor_consumer(ActualConsumerTag, State1);
+ false -> State1
+ end};
+ {{error, exclusive_consume_unavailable}, _Q} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
[rabbit_misc:rs(QueueName)])
@@ -686,26 +755,31 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
nowait = NoWait},
- _, State = #ch{consumer_mapping = ConsumerMapping }) ->
+ _, State = #ch{consumer_mapping = ConsumerMapping,
+ consumer_monitors = ConsumerMonitors}) ->
OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
%% Spec requires we ignore this situation.
return_ok(State, NoWait, OkMsg);
- {ok, QueueName} ->
- NewState = State#ch{consumer_mapping =
- dict:erase(ConsumerTag,
- ConsumerMapping)},
- case rabbit_amqqueue:with(
- QueueName,
- fun (Q) ->
- %% In order to ensure that no more messages
- %% are sent to the consumer after the
- %% cancel_ok has been sent, we get the
- %% queue process to send the cancel_ok on
- %% our behalf. If we were sending the
- %% cancel_ok ourselves it might overtake a
- %% message sent previously by the queue.
+ {ok, {Q, MRef}} ->
+ ConsumerMonitors1 =
+ case MRef of
+ undefined -> ConsumerMonitors;
+ _ -> true = erlang:demonitor(MRef),
+ dict:erase(MRef, ConsumerMonitors)
+ end,
+ NewState = State#ch{consumer_mapping = dict:erase(ConsumerTag,
+ ConsumerMapping),
+ consumer_monitors = ConsumerMonitors1},
+ %% In order to ensure that no more messages are sent to
+ %% the consumer after the cancel_ok has been sent, we get
+ %% the queue process to send the cancel_ok on our
+ %% behalf. If we were sending the cancel_ok ourselves it
+ %% might overtake a message sent previously by the queue.
+ case rabbit_misc:with_exit_handler(
+ fun () -> {error, not_found} end,
+ fun () ->
rabbit_amqqueue:basic_cancel(
Q, self(), ConsumerTag,
ok_msg(NoWait, #'basic.cancel_ok'{
@@ -816,7 +890,6 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
nowait = NoWait},
_, State = #ch{virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- check_configure_permitted(ExchangeName, State),
check_not_default_exchange(ExchangeName),
_ = rabbit_exchange:lookup_or_die(ExchangeName),
return_ok(State, NoWait, #'exchange.declare_ok'{});
@@ -864,10 +937,10 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
nowait = NoWait,
arguments = Args} = Declare,
_, State = #ch{virtual_host = VHostPath,
- reader_pid = ReaderPid,
+ conn_pid = ConnPid,
queue_collector_pid = CollectorPid}) ->
Owner = case ExclusiveDeclare of
- true -> ReaderPid;
+ true -> ConnPid;
false -> none
end,
ActualNameBin = case QueueNameBin of
@@ -910,13 +983,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
passive = true,
nowait = NoWait},
_, State = #ch{virtual_host = VHostPath,
- reader_pid = ReaderPid}) ->
+ conn_pid = ConnPid}) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
- check_configure_permitted(QueueName, State),
{{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
rabbit_amqqueue:with_or_die(
QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end),
- ok = rabbit_amqqueue:check_exclusive_access(Q, ReaderPid),
+ ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
State);
@@ -924,11 +996,11 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
if_unused = IfUnused,
if_empty = IfEmpty,
nowait = NoWait},
- _, State = #ch{reader_pid = ReaderPid}) ->
+ _, State = #ch{conn_pid = ConnPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_configure_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ReaderPid,
+ QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
rabbit_misc:protocol_error(
@@ -960,42 +1032,42 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
- _, State = #ch{reader_pid = ReaderPid}) ->
+ _, State = #ch{conn_pid = ConnPid}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
{ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ReaderPid,
+ QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:purge(Q) end),
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
-
handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
rabbit_misc:protocol_error(
precondition_failed, "cannot switch from confirm to tx mode", []);
-handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none}) ->
- {reply, #'tx.select_ok'{}, new_tx(State)};
-
handle_method(#'tx.select'{}, _, State) ->
- {reply, #'tx.select_ok'{}, State};
+ {reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}};
-handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) ->
+handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
-handle_method(#'tx.commit'{}, _, State) ->
- {reply, #'tx.commit_ok'{}, internal_commit(State)};
+handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
+ uncommitted_ack_q = TAQ}) ->
+ State1 = new_tx(ack(TAQ, rabbit_misc:queue_fold(fun deliver_to_queues/2,
+ State, TMQ))),
+ {noreply, maybe_complete_tx(State1#ch{tx_status = committing})};
-handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) ->
+handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
-handle_method(#'tx.rollback'{}, _, State) ->
- {reply, #'tx.rollback_ok'{}, internal_rollback(State)};
+handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
+ uncommitted_ack_q = TAQ}) ->
+ {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q =
+ queue:join(TAQ, UAMQ)})};
-handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId})
- when TxId =/= none ->
+handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
rabbit_misc:protocol_error(
precondition_failed, "cannot switch from tx to confirm mode", []);
@@ -1035,10 +1107,63 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
+monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping,
+ consumer_monitors = ConsumerMonitors,
+ capabilities = Capabilities}) ->
+ case rabbit_misc:table_lookup(
+ Capabilities, <<"consumer_cancel_notify">>) of
+ {bool, true} ->
+ {#amqqueue{pid = QPid} = Q, undefined} =
+ dict:fetch(ConsumerTag, ConsumerMapping),
+ MRef = erlang:monitor(process, QPid),
+ State#ch{consumer_mapping =
+ dict:store(ConsumerTag, {Q, MRef}, ConsumerMapping),
+ consumer_monitors =
+ dict:store(MRef, ConsumerTag, ConsumerMonitors)};
+ _ ->
+ State
+ end.
+
+handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
+ MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSet} -> gb_sets:to_list(MsgSet);
+ none -> []
+ end,
+ %% We remove the MsgSeqNos from UQM before calling
+ %% process_confirms to prevent each MsgSeqNo being removed from
+ %% the set one by one which which would be inefficient
+ State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
+ {Nack, SendFun} =
+ case Reason of
+ Reason when Reason =:= noproc; Reason =:= noconnection;
+ Reason =:= normal; Reason =:= shutdown ->
+ {false, fun record_confirms/2};
+ {shutdown, _} ->
+ {false, fun record_confirms/2};
+ _ ->
+ {true, fun send_nacks/2}
+ end,
+ {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
+ erase_queue_stats(QPid),
+ State3 = SendFun(MXs, State2),
+ queue_blocked(QPid, State3).
+
+handle_consuming_queue_down(MRef, ConsumerTag,
+ State = #ch{consumer_mapping = ConsumerMapping,
+ consumer_monitors = ConsumerMonitors,
+ writer_pid = WriterPid}) ->
+ ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping),
+ ConsumerMonitors1 = dict:erase(MRef, ConsumerMonitors),
+ Cancel = #'basic.cancel'{consumer_tag = ConsumerTag,
+ nowait = true},
+ ok = rabbit_writer:send_command(WriterPid, Cancel),
+ State#ch{consumer_mapping = ConsumerMapping1,
+ consumer_monitors = ConsumerMonitors1}.
+
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
- reader_pid = ReaderPid}) ->
+ conn_pid = ConnPid }) ->
%% FIXME: connection exception (!) on failure??
%% (see rule named "failure" in spec-XML)
%% FIXME: don't allow binding to internal exchanges -
@@ -1054,7 +1179,7 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
key = ActualRoutingKey,
args = Arguments},
fun (_X, Q = #amqqueue{}) ->
- try rabbit_amqqueue:check_exclusive_access(Q, ReaderPid)
+ try rabbit_amqqueue:check_exclusive_access(Q, ConnPid)
catch exit:Reason -> {error, Reason}
end;
(_X, #exchange{}) ->
@@ -1079,11 +1204,10 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
end.
basic_return(#basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
+ routing_keys = [RoutingKey | _CcRoutes],
content = Content},
- WriterPid, Reason) ->
- {_Close, ReplyCode, ReplyText} =
- rabbit_framing_amqp_0_9_1:lookup_amqp_exception(Reason),
+ #ch{protocol = Protocol, writer_pid = WriterPid}, Reason) ->
+ {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.return'{reply_code = ReplyCode,
@@ -1128,52 +1252,24 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
precondition_failed, "unknown delivery tag ~w", [DeliveryTag])
end.
-add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) ->
- State#ch{tx_participants = sets:union(Participants,
- sets:from_list(MoreP))}.
-
-ack(TxnKey, UAQ) ->
- fold_per_queue(
- fun (QPid, MsgIds, L) ->
- ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()),
- [{QPid, length(MsgIds)} | L]
- end, [], UAQ).
-
-make_tx_id() -> rabbit_guid:guid().
-
-new_tx(State) ->
- State#ch{transaction_id = make_tx_id(),
- tx_participants = sets:new(),
- uncommitted_ack_q = queue:new()}.
-
-internal_commit(State = #ch{transaction_id = TxnKey,
- tx_participants = Participants}) ->
- case rabbit_amqqueue:commit_all(sets:to_list(Participants),
- TxnKey, self()) of
- ok -> ok = notify_limiter(State#ch.limiter_pid,
- State#ch.uncommitted_ack_q),
- new_tx(State);
- {error, Errors} -> rabbit_misc:protocol_error(
- internal_error, "commit failed: ~w", [Errors])
- end.
+ack(Acked, State) ->
+ QIncs = fold_per_queue(
+ fun (QPid, MsgIds, L) ->
+ ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
+ [{QPid, length(MsgIds)} | L]
+ end, [], Acked),
+ maybe_incr_stats(QIncs, ack, State),
+ ok = notify_limiter(State#ch.limiter_pid, Acked),
+ State.
+
+new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
+ uncommitted_ack_q = queue:new()}.
-internal_rollback(State = #ch{transaction_id = TxnKey,
- tx_participants = Participants,
- uncommitted_ack_q = UAQ,
- unacked_message_q = UAMQ}) ->
- ?LOGDEBUG("rollback ~p~n - ~p acks uncommitted, ~p messages unacked~n",
- [self(),
- queue:len(UAQ),
- queue:len(UAMQ)]),
- ok = rabbit_amqqueue:rollback_all(sets:to_list(Participants),
- TxnKey, self()),
- NewUAMQ = queue:join(UAQ, UAMQ),
- new_tx(State#ch{unacked_message_q = NewUAMQ}).
-
-rollback_and_notify(State = #ch{transaction_id = none}) ->
- notify_queues(State);
-rollback_and_notify(State) ->
- notify_queues(internal_rollback(State)).
+notify_queues(State = #ch{state = closing}) ->
+ {ok, State};
+notify_queues(State = #ch{consumer_mapping = Consumers}) ->
+ {rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()),
+ State#ch{state = closing}}.
fold_per_queue(F, Acc0, UAQ) ->
D = rabbit_misc:queue_fold(
@@ -1192,9 +1288,6 @@ start_limiter(State = #ch{unacked_message_q = UAMQ, start_limiter_fun = SLF}) ->
ok = limit_queues(LPid, State),
LPid.
-notify_queues(#ch{consumer_mapping = Consumers}) ->
- rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()).
-
unlimit_queues(State) ->
ok = limit_queues(undefined, State),
undefined.
@@ -1203,16 +1296,9 @@ limit_queues(LPid, #ch{consumer_mapping = Consumers}) ->
rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid).
consumer_queues(Consumers) ->
- [QPid || QueueName <-
- sets:to_list(
- dict:fold(fun (_ConsumerTag, QueueName, S) ->
- sets:add_element(QueueName, S)
- end, sets:new(), Consumers)),
- case rabbit_amqqueue:lookup(QueueName) of
- {ok, Q} -> QPid = Q#amqqueue.pid, true;
- %% queue has been deleted in the meantime
- {error, not_found} -> QPid = none, false
- end].
+ lists:usort([QPid ||
+ {_Key, {#amqqueue{pid = QPid}, _MRef}}
+ <- dict:to_list(Consumers)]).
%% tell the limiter about the number of acks that have been received
%% for messages delivered to subscribed consumers, but not acks for
@@ -1228,32 +1314,47 @@ notify_limiter(LimiterPid, Acked) ->
Count -> rabbit_limiter:ack(LimiterPid, Count)
end.
-is_message_persistent(Content) ->
- case rabbit_basic:is_message_persistent(Content) of
- {invalid, Other} ->
- rabbit_log:warning("Unknown delivery mode ~p - "
- "treating as 1, non-persistent~n",
- [Other]),
- false;
- IsPersistent when is_boolean(IsPersistent) ->
- IsPersistent
- end.
+deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
+ exchange_name = XName},
+ msg_seq_no = MsgSeqNo},
+ QNames}, State) ->
+ {RoutingRes, DeliveredQPids} = rabbit_router:deliver(QNames, Delivery),
+ State1 = process_routing_result(RoutingRes, DeliveredQPids,
+ XName, MsgSeqNo, Message, State),
+ maybe_incr_stats([{XName, 1} |
+ [{{QPid, XName}, 1} ||
+ QPid <- DeliveredQPids]], publish, State1),
+ State1.
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
- ok = basic_return(Msg, State#ch.writer_pid, no_route),
+ ok = basic_return(Msg, State, no_route),
+ maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
+ return_unroutable, State),
record_confirm(MsgSeqNo, XName, State);
process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
- ok = basic_return(Msg, State#ch.writer_pid, no_consumers),
+ ok = basic_return(Msg, State, no_consumers),
+ maybe_incr_stats([{XName, 1}], return_not_delivered, State),
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
- #ch{unconfirmed = UC} = State,
- [maybe_monitor(QPid) || QPid <- QPids],
- UC1 = gb_trees:insert(MsgSeqNo, {XName, sets:from_list(QPids)}, UC),
- State#ch{unconfirmed = UC1}.
+ #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State,
+ UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ),
+ SingletonSet = gb_sets:singleton(MsgSeqNo),
+ UQM1 = lists:foldl(
+ fun (QPid, UQM2) ->
+ maybe_monitor(QPid),
+ case gb_trees:lookup(QPid, UQM2) of
+ {value, MsgSeqNos} ->
+ MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
+ gb_trees:update(QPid, MsgSeqNos1, UQM2);
+ none ->
+ gb_trees:insert(QPid, SingletonSet, UQM2)
+ end
+ end, UQM, QPids),
+ State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
@@ -1262,20 +1363,25 @@ lock_message(false, _MsgStruct, State) ->
send_nacks([], State) ->
State;
-send_nacks(MXs, State) ->
+send_nacks(MXs, State = #ch{tx_status = none}) ->
MsgSeqNos = [ MsgSeqNo || {MsgSeqNo, _} <- MXs ],
coalesce_and_send(MsgSeqNos,
fun(MsgSeqNo, Multiple) ->
#'basic.nack'{delivery_tag = MsgSeqNo,
multiple = Multiple}
- end, State).
+ end, State);
+send_nacks(_, State) ->
+ maybe_complete_tx(State#ch{tx_status = failed}).
-send_confirms(State = #ch{confirmed = C}) ->
+send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
C1 = lists:append(C),
MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State),
MsgSeqNo
end || {MsgSeqNo, ExchangeName} <- C1 ],
- send_confirms(MsgSeqNos, State #ch{confirmed = []}).
+ send_confirms(MsgSeqNos, State #ch{confirmed = []});
+send_confirms(State) ->
+ maybe_complete_tx(State).
+
send_confirms([], State) ->
State;
send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
@@ -1289,11 +1395,11 @@ send_confirms(Cs, State) ->
end, State).
coalesce_and_send(MsgSeqNos, MkMsgFun,
- State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
+ State = #ch{writer_pid = WriterPid, unconfirmed_mq = UMQ}) ->
SMsgSeqNos = lists:usort(MsgSeqNos),
- CutOff = case gb_trees:is_empty(UC) of
+ CutOff = case gb_trees:is_empty(UMQ) of
true -> lists:last(SMsgSeqNos) + 1;
- false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo
+ false -> {SeqNo, _XQ} = gb_trees:smallest(UMQ), SeqNo
end,
{Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
case Ms of
@@ -1305,28 +1411,44 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
State.
-terminate(_State) ->
- pg_local:leave(rabbit_channels, self()),
- rabbit_event:notify(channel_closed, [{pid, self()}]).
+maybe_complete_tx(State = #ch{tx_status = in_progress}) ->
+ State;
+maybe_complete_tx(State = #ch{unconfirmed_mq = UMQ}) ->
+ case gb_trees:is_empty(UMQ) of
+ false -> State;
+ true -> complete_tx(State#ch{confirmed = []})
+ end.
+
+complete_tx(State = #ch{tx_status = committing}) ->
+ ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}),
+ State#ch{tx_status = in_progress};
+complete_tx(State = #ch{tx_status = failed}) ->
+ {noreply, State1} = send_exception(
+ rabbit_misc:amqp_error(
+ precondition_failed, "partial tx completion", [],
+ 'tx.commit'),
+ State),
+ State1#ch{tx_status = in_progress}.
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
-i(pid, _) -> self();
-i(connection, #ch{reader_pid = ReaderPid}) -> ReaderPid;
-i(number, #ch{channel = Channel}) -> Channel;
-i(user, #ch{user = User}) -> User#user.username;
-i(vhost, #ch{virtual_host = VHost}) -> VHost;
-i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
-i(confirm, #ch{confirm_enabled = CE}) -> CE;
+i(pid, _) -> self();
+i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
+i(number, #ch{channel = Channel}) -> Channel;
+i(user, #ch{user = User}) -> User#user.username;
+i(vhost, #ch{virtual_host = VHost}) -> VHost;
+i(transactional, #ch{tx_status = TE}) -> TE =/= none;
+i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
-i(messages_unconfirmed, #ch{unconfirmed = UC}) ->
- gb_trees:size(UC);
-i(messages_unacknowledged, #ch{unacked_message_q = UAMQ,
- uncommitted_ack_q = UAQ}) ->
- queue:len(UAMQ) + queue:len(UAQ);
-i(acks_uncommitted, #ch{uncommitted_ack_q = UAQ}) ->
- queue:len(UAQ);
+i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) ->
+ gb_trees:size(UMQ);
+i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
+ queue:len(UAMQ);
+i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->
+ queue:len(TMQ);
+i(acks_uncommitted, #ch{uncommitted_ack_q = TAQ}) ->
+ queue:len(TAQ);
i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
rabbit_limiter:get_limit(LimiterPid);
i(client_flow_blocked, #ch{limiter_pid = LimiterPid}) ->
@@ -1334,6 +1456,11 @@ i(client_flow_blocked, #ch{limiter_pid = LimiterPid}) ->
i(Item, _) ->
throw({bad_argument, Item}).
+maybe_incr_redeliver_stats(true, QPid, State) ->
+ maybe_incr_stats([{QPid, 1}], redeliver, State);
+maybe_incr_redeliver_stats(_, _, _) ->
+ ok.
+
maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) ->
case rabbit_event:stats_level(StatsTimer) of
fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs];