summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2019-04-25 21:10:52 +0300
committerMichael Klishin <michael@clojurewerkz.org>2019-04-25 21:10:52 +0300
commit99d0adebb332fa45e9507e7b8d5a73bd4ab34338 (patch)
treebf4e41bbf82592e5114901362a667d3fe7166894
parentd279ccfee32764ad76517fb101015fa7ec77d9f5 (diff)
parente237612ae74990aff612a79b617d1dad438aaf51 (diff)
downloadrabbitmq-server-git-99d0adebb332fa45e9507e7b8d5a73bd4ab34338.tar.gz
Merge branch 'master' into rabbitmq-erlang-client-91
-rw-r--r--Makefile3
-rw-r--r--src/rabbit.erl33
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_channel.erl673
-rw-r--r--src/rabbit_core_metrics_gc.erl2
-rw-r--r--src/rabbit_ssl.erl59
-rw-r--r--src/rabbit_vm.erl4
-rw-r--r--test/consumer_timeout_SUITE.erl272
-rw-r--r--test/queue_parallel_SUITE.erl115
-rw-r--r--test/quorum_queue_SUITE.erl2
-rw-r--r--test/rabbit_core_metrics_gc_SUITE.erl2
-rw-r--r--test/unit_inbroker_non_parallel_SUITE.erl98
12 files changed, 907 insertions, 360 deletions
diff --git a/Makefile b/Makefile
index 6620c8359b..9e818a8937 100644
--- a/Makefile
+++ b/Makefile
@@ -126,7 +126,8 @@ define PROJECT_ENV
{vhost_restart_strategy, continue},
%% {global, prefetch count}
{default_consumer_prefetch, {false, 0}},
- {channel_queue_cleanup_interval, 60000},
+ %% interval at which the channel can perform periodic actions
+ {channel_tick_interval, 60000},
%% Default max message size is 128 MB
{max_message_size, 134217728}
]
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 09b0494ce1..4a974fd682 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -320,13 +320,11 @@ ensure_config() ->
case rabbit_config:validate_config_files() of
ok -> ok;
{error, {ErrFmt, ErrArgs}} ->
- log_boot_error_and_exit(check_config_file, ErrFmt, ErrArgs)
+ throw({error, {check_config_file, ErrFmt, ErrArgs}})
end,
case rabbit_config:prepare_and_use_config() of
{error, {generation_error, Error}} ->
- log_boot_error_and_exit(generate_config_file,
- "~nConfig file generation failed ~s",
- Error);
+ throw({error, {generate_config_file, Error}});
ok -> ok
end.
@@ -463,9 +461,8 @@ start_it(StartFun) ->
true -> ok;
false -> StartFun()
end
- catch
- Class:Reason ->
- boot_error(Class, Reason)
+ catch Class:Reason ->
+ boot_error(Class, Reason)
after
unlink(Marker),
Marker ! stop,
@@ -1018,11 +1015,23 @@ boot_error(_, {could_not_start, rabbit, {{timeout_waiting_for_tables, _}, _}}) -
log_boot_error_and_exit(
timeout_waiting_for_tables,
"~n" ++ Err ++ rabbit_nodes:diagnostics(Nodes), []);
-boot_error(Class, {error, {cannot_log_to_file, _, _}} = Reason) ->
- log_boot_error_and_exit(
- Reason,
- "~nError description:~s",
- [lager:pr_stacktrace(erlang:get_stacktrace(), {Class, Reason})]);
+boot_error(_, {error, {cannot_log_to_file, unknown, Reason}}) ->
+ log_boot_error_and_exit(could_not_initialise_logger,
+ "failed to initialised logger: ~p~n",
+ [Reason]);
+boot_error(_, {error, {cannot_log_to_file, LogFile,
+ {cannot_create_parent_dirs, _, Reason}}}) ->
+ log_boot_error_and_exit(could_not_initialise_logger,
+ "failed to create parent directory for log file at '~s', reason: ~p~n",
+ [LogFile, Reason]);
+boot_error(_, {error, {cannot_log_to_file, LogFile, Reason}}) ->
+ log_boot_error_and_exit(could_not_initialise_logger,
+ "failed to open log file at '~s', reason: ~p~n",
+ [LogFile, Reason]);
+boot_error(_, {error, {generate_config_file, Error}}) ->
+ log_boot_error_and_exit(generate_config_file,
+ "~nConfig file generation failed:~n~s~n",
+ [Error]);
boot_error(Class, Reason) ->
LogLocations = log_locations(),
log_boot_error_and_exit(
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 167e6c96a6..9e94dd8f27 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -74,6 +74,7 @@
-export_type([name/0, qmsg/0, absent_reason/0]).
-type name() :: rabbit_types:r('queue').
+
-type qpids() :: [pid()].
-type qlen() :: rabbit_types:ok(non_neg_integer()).
-type qfun(A) :: fun ((amqqueue:amqqueue()) -> A | no_return()).
@@ -1228,12 +1229,11 @@ purge(Q) when ?amqqueue_is_quorum(Q) ->
NodeId = amqqueue:get_pid(Q),
rabbit_quorum_queue:purge(NodeId).
--spec requeue(pid(),
+-spec requeue(pid() | amqqueue:ra_server_id(),
{rabbit_fifo:consumer_tag(), [msg_id()]},
pid(),
quorum_states()) ->
'ok'.
-
requeue(QPid, {_, MsgIds}, ChPid, QuorumStates) when ?IS_CLASSIC(QPid) ->
ok = delegate:invoke(QPid, {gen_server2, call, [{requeue, MsgIds, ChPid}, infinity]}),
QuorumStates;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 59a8cce371..69add85057 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -66,8 +66,9 @@
-export([source/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
- prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
+ handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1,
+ prioritise_call/4, prioritise_cast/3, prioritise_info/3,
+ format_message_queue/2]).
-deprecated([{force_event_refresh, 1, eventually}]).
@@ -81,91 +82,96 @@
%% Mgmt HTTP API refactor
-export([handle_method/6]).
--record(ch, {
- %% starting | running | flow | closing
- state,
- %% same as reader's protocol. Used when instantiating
- %% (protocol) exceptions.
- protocol,
- %% channel number
- channel,
- %% reader process
- reader_pid,
- %% writer process
- writer_pid,
- %%
- conn_pid,
- %% same as reader's name, see #v1.name
- %% in rabbit_reader
- conn_name,
- %% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined
- %% or any other channel creating/spawning entity
- source,
- %% limiter pid, see rabbit_limiter
- limiter,
- %% none | {Msgs, Acks} | committing | failed |
- tx,
- %% (consumer) delivery tag sequence
- next_tag,
- %% messages pending consumer acknowledgement
- unacked_message_q,
- %% same as #v1.user in the reader, used in
- %% authorisation checks
- user,
- %% same as #v1.user in the reader
- virtual_host,
- %% when queue.bind's queue field is empty,
- %% this name will be used instead
- most_recently_declared_queue,
- %% a map of queue ref to queue name
- queue_names,
- %% queue processes are monitored to update
- %% queue names
- queue_monitors,
- %% a map of consumer tags to
- %% consumer details: #amqqueue record, acknowledgement mode,
- %% consumer exclusivity, etc
- consumer_mapping,
- %% a map of queue pids to consumer tag lists
- queue_consumers,
- %% a set of pids of queues that have unacknowledged
- %% deliveries
- delivering_queues,
- %% when a queue is declared as exclusive, queue
- %% collector must be notified.
- %% see rabbit_queue_collector for more info.
- queue_collector_pid,
- %% timer used to emit statistics
- stats_timer,
- %% are publisher confirms enabled for this channel?
- confirm_enabled,
- %% publisher confirm delivery tag sequence
- publish_seqno,
- %% a dtree used to track unconfirmed
- %% (to publishers) messages
- unconfirmed,
- %% a list of tags for published messages that were
- %% delivered but are yet to be confirmed to the client
- confirmed,
- %% a list of tags for published messages that were
- %% rejected but are yet to be sent to the client
- rejected,
- %% same as capabilities in the reader
- capabilities,
- %% tracing exchange resource if tracing is enabled,
- %% 'none' otherwise
- trace_state,
- consumer_prefetch,
- %% used by "one shot RPC" (amq.
- reply_consumer,
- %% flow | noflow, see rabbitmq-server#114
- delivery_flow,
- interceptor_state,
- queue_states,
- queue_cleanup_timer,
- %% Message content size limit
- max_message_size
-}).
+-record(conf, {
+ %% starting | running | flow | closing
+ state,
+ %% same as reader's protocol. Used when instantiating
+ %% (protocol) exceptions.
+ protocol,
+ %% channel number
+ channel,
+ %% reader process
+ reader_pid,
+ %% writer process
+ writer_pid,
+ %%
+ conn_pid,
+ %% same as reader's name, see #v1.name
+ %% in rabbit_reader
+ conn_name,
+ %% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined
+ %% or any other channel creating/spawning entity
+ source,
+ %% same as #v1.user in the reader, used in
+ %% authorisation checks
+ user,
+ %% same as #v1.user in the reader
+ virtual_host,
+ %% when queue.bind's queue field is empty,
+ %% this name will be used instead
+ most_recently_declared_queue,
+ %% when a queue is declared as exclusive, queue
+ %% collector must be notified.
+ %% see rabbit_queue_collector for more info.
+ queue_collector_pid,
+
+ %% same as capabilities in the reader
+ capabilities,
+ %% tracing exchange resource if tracing is enabled,
+ %% 'none' otherwise
+ trace_state,
+ consumer_prefetch,
+ %% Message content size limit
+ max_message_size,
+ consumer_timeout
+ }).
+
+-record(ch, {cfg :: #conf{},
+ %% limiter state, see rabbit_limiter
+ limiter,
+ %% none | {Msgs, Acks} | committing | failed |
+ tx,
+ %% (consumer) delivery tag sequence
+ next_tag,
+ %% messages pending consumer acknowledgement
+ unacked_message_q,
+ %% a map of queue ref to queue name
+ queue_names,
+ %% queue processes are monitored to update
+ %% queue names
+ queue_monitors,
+ %% a map of consumer tags to
+ %% consumer details: #amqqueue record, acknowledgement mode,
+ %% consumer exclusivity, etc
+ consumer_mapping,
+ %% a map of queue pids to consumer tag lists
+ queue_consumers,
+ %% a set of pids of queues that have unacknowledged
+ %% deliveries
+ delivering_queues,
+ %% timer used to emit statistics
+ stats_timer,
+ %% are publisher confirms enabled for this channel?
+ confirm_enabled,
+ %% publisher confirm delivery tag sequence
+ publish_seqno,
+ %% a dtree used to track unconfirmed
+ %% (to publishers) messages
+ unconfirmed,
+ %% a list of tags for published messages that were
+ %% delivered but are yet to be confirmed to the client
+ confirmed,
+ %% a list of tags for published messages that were
+ %% rejected but are yet to be sent to the client
+ rejected,
+ %% used by "one shot RPC" (amq.
+ reply_consumer,
+ %% flow | noflow, see rabbitmq-server#114
+ delivery_flow,
+ interceptor_state,
+ queue_states,
+ tick_timer
+ }).
-define(QUEUE, lqueue).
@@ -483,39 +489,43 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
Limiter0
end,
MaxMessageSize = get_max_message_size(),
- State = #ch{state = starting,
- protocol = Protocol,
- channel = Channel,
- reader_pid = ReaderPid,
- writer_pid = WriterPid,
- conn_pid = ConnPid,
- conn_name = ConnName,
- limiter = Limiter,
+ ConsumerTimeout = get_consumer_timeout(),
+ State = #ch{cfg = #conf{state = starting,
+ protocol = Protocol,
+ channel = Channel,
+ reader_pid = ReaderPid,
+ writer_pid = WriterPid,
+ conn_pid = ConnPid,
+ conn_name = ConnName,
+ user = User,
+ virtual_host = VHost,
+ most_recently_declared_queue = <<>>,
+ queue_collector_pid = CollectorPid,
+ capabilities = Capabilities,
+ trace_state = rabbit_trace:init(VHost),
+ consumer_prefetch = Prefetch,
+ max_message_size = MaxMessageSize,
+ consumer_timeout = ConsumerTimeout
+ },
+ limiter = Limiter,
tx = none,
next_tag = 1,
unacked_message_q = ?QUEUE:new(),
- user = User,
- virtual_host = VHost,
- most_recently_declared_queue = <<>>,
queue_names = #{},
queue_monitors = pmon:new(),
consumer_mapping = #{},
queue_consumers = #{},
delivering_queues = sets:new(),
- queue_collector_pid = CollectorPid,
confirm_enabled = false,
publish_seqno = 1,
unconfirmed = dtree:empty(),
rejected = [],
confirmed = [],
- capabilities = Capabilities,
- trace_state = rabbit_trace:init(VHost),
- consumer_prefetch = Prefetch,
reply_consumer = none,
delivery_flow = Flow,
interceptor_state = undefined,
- queue_states = #{},
- max_message_size = MaxMessageSize},
+ queue_states = #{}
+ },
State1 = State#ch{
interceptor_state = rabbit_channel_interceptor:init(State)},
State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer),
@@ -525,7 +535,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
rabbit_event:if_enabled(State2, #ch.stats_timer,
fun() -> emit_stats(State2) end),
put_operation_timeout(),
- State3 = init_queue_cleanup_timer(State2),
+ State3 = init_tick_timer(State2),
{ok, State3, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -568,8 +578,9 @@ handle_call({{info, Items}, Deadline}, _From, State) ->
reply({error, Error}, State)
end;
-handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
- reply(ok, State#ch{trace_state = rabbit_trace:init(VHost)});
+handle_call(refresh_config, _From,
+ State = #ch{cfg = #conf{virtual_host = VHost} = Cfg}) ->
+ reply(ok, State#ch{cfg = Cfg#conf{trace_state = rabbit_trace:init(VHost)}});
handle_call(refresh_interceptors, _From, State) ->
IState = rabbit_channel_interceptor:init(State),
@@ -590,7 +601,7 @@ handle_call(_Request, _From, State) ->
noreply(State).
handle_cast({method, Method, Content, Flow},
- State = #ch{reader_pid = Reader,
+ State = #ch{cfg = #conf{reader_pid = Reader},
interceptor_state = IState}) ->
case Flow of
%% We are going to process a message from the rabbit_reader
@@ -617,12 +628,13 @@ handle_cast({method, Method, Content, Flow},
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
-handle_cast(ready_for_close, State = #ch{state = closing,
- writer_pid = WriterPid}) ->
+handle_cast(ready_for_close,
+ State = #ch{cfg = #conf{state = closing,
+ writer_pid = WriterPid}}) ->
ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
{stop, normal, State};
-handle_cast(terminate, State = #ch{writer_pid = WriterPid}) ->
+handle_cast(terminate, State = #ch{cfg = #conf{writer_pid = WriterPid}}) ->
ok = rabbit_writer:flush(WriterPid),
{stop, normal, State};
@@ -634,12 +646,14 @@ handle_cast({command, Msg}, State) ->
ok = send(Msg, State),
noreply(State);
-handle_cast({deliver, _CTag, _AckReq, _Msg}, State = #ch{state = closing}) ->
+handle_cast({deliver, _CTag, _AckReq, _Msg},
+ State = #ch{cfg = #conf{state = closing}}) ->
noreply(State);
handle_cast({deliver, ConsumerTag, AckRequired, Msg}, State) ->
noreply(handle_deliver(ConsumerTag, AckRequired, Msg, State));
-handle_cast({deliver_reply, _K, _Del}, State = #ch{state = closing}) ->
+handle_cast({deliver_reply, _K, _Del},
+ State = #ch{cfg = #conf{state = closing}}) ->
noreply(State);
handle_cast({deliver_reply, _K, _Del}, State = #ch{reply_consumer = none}) ->
noreply(State);
@@ -647,8 +661,8 @@ handle_cast({deliver_reply, Key, #delivery{message =
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}}},
- State = #ch{writer_pid = WriterPid,
- next_tag = DeliveryTag,
+ State = #ch{cfg = #conf{writer_pid = WriterPid},
+ next_tag = DeliveryTag,
reply_consumer = {ConsumerTag, _Suffix, Key}}) ->
ok = rabbit_writer:send_command(
WriterPid,
@@ -662,12 +676,14 @@ handle_cast({deliver_reply, Key, #delivery{message =
handle_cast({deliver_reply, _K1, _}, State=#ch{reply_consumer = {_, _, _K2}}) ->
noreply(State);
-handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) ->
+handle_cast({send_credit_reply, Len},
+ State = #ch{cfg = #conf{writer_pid = WriterPid}}) ->
ok = rabbit_writer:send_command(
WriterPid, #'basic.credit_ok'{available = Len}),
noreply(State);
-handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) ->
+handle_cast({send_drained, CTagCredit},
+ State = #ch{cfg = #conf{writer_pid = WriterPid}}) ->
[ok = rabbit_writer:send_command(
WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag,
credit_drained = CreditDrained})
@@ -734,7 +750,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
{internal, MsgSeqNos, Actions, QState1} ->
State = State0#ch{queue_states = maps:put(Name, QState1, QueueStates)},
%% execute actions
- WriterPid = State#ch.writer_pid,
+ WriterPid = State#ch.cfg#conf.writer_pid,
lists:foreach(fun ({send_credit_reply, Avail}) ->
ok = rabbit_writer:send_command(
WriterPid,
@@ -805,34 +821,45 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
-handle_info({{Ref, Node}, LateAnswer}, State = #ch{channel = Channel})
+handle_info({{Ref, Node}, LateAnswer},
+ State = #ch{cfg = #conf{channel = Channel}})
when is_reference(Ref) ->
rabbit_log_channel:warning("Channel ~p ignoring late answer ~p from ~p",
[Channel, LateAnswer, Node]),
noreply(State);
-handle_info(queue_cleanup, State = #ch{queue_states = QueueStates0}) ->
- QueueStates =
+handle_info(tick, State0 = #ch{queue_states = QueueStates0}) ->
+ QueueStates1 =
maps:filter(fun(_, QS) ->
- QName = rabbit_quorum_queue:queue_name(QS),
- [] /= rabbit_amqqueue:lookup(QName)
+ QName = rabbit_quorum_queue:queue_name(QS),
+ [] /= rabbit_amqqueue:lookup([QName])
end, QueueStates0),
- noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates}));
-
-handle_info({channel_source, Source}, State = #ch{}) ->
- noreply(State#ch{source = Source}).
+ case evaluate_consumer_timeout(State0#ch{queue_states = QueueStates1}) of
+ {noreply, State} ->
+ noreply(init_tick_timer(reset_tick_timer(State)));
+ Return ->
+ Return
+ end;
+handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) ->
+ noreply(State#ch{cfg = Cfg#conf{source = Source}}).
-handle_pre_hibernate(State) ->
+handle_pre_hibernate(State0) ->
ok = clear_permission_cache(),
+ State = maybe_cancel_tick_timer(State0),
rabbit_event:if_enabled(
State, #ch.stats_timer,
fun () -> emit_stats(State,
[{idle_since,
os:system_time(milli_seconds)}])
- end),
+ end),
{hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}.
-terminate(_Reason, State = #ch{user = #user{username = Username}}) ->
+handle_post_hibernate(State0) ->
+ State = init_tick_timer(State0),
+ {noreply, State}.
+
+terminate(_Reason,
+ State = #ch{cfg = #conf{user = #user{username = Username}}}) ->
{_Res, _State1} = notify_queues(State),
pg_local:leave(rabbit_channels, self()),
rabbit_event:if_enabled(State, #ch.stats_timer,
@@ -857,6 +884,13 @@ get_max_message_size() ->
?MAX_MSG_SIZE
end.
+get_consumer_timeout() ->
+ case application:get_env(rabbit, consumer_timeout) of
+ {ok, MS} when is_integer(MS) ->
+ MS;
+ _ ->
+ undefined
+ end.
%%---------------------------------------------------------------------------
reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
@@ -878,22 +912,23 @@ return_ok(State, false, Msg) -> {reply, Msg, State}.
ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
-send(_Command, #ch{state = closing}) ->
+send(_Command, #ch{cfg = #conf{state = closing}}) ->
ok;
-send(Command, #ch{writer_pid = WriterPid}) ->
+send(Command, #ch{cfg = #conf{writer_pid = WriterPid}}) ->
ok = rabbit_writer:send_command(WriterPid, Command).
format_soft_error(#amqp_error{name = N, explanation = E, method = M}) ->
io_lib:format("operation ~s caused a channel exception ~s: ~ts", [M, N, E]).
-handle_exception(Reason, State = #ch{protocol = Protocol,
- channel = Channel,
- writer_pid = WriterPid,
- reader_pid = ReaderPid,
- conn_pid = ConnPid,
- conn_name = ConnName,
- virtual_host = VHost,
- user = User}) ->
+handle_exception(Reason, State = #ch{cfg = #conf{protocol = Protocol,
+ channel = Channel,
+ writer_pid = WriterPid,
+ reader_pid = ReaderPid,
+ conn_pid = ConnPid,
+ conn_name = ConnName,
+ virtual_host = VHost,
+ user = User
+ }}) ->
%% something bad's happened: notify_queues may not be 'ok'
{_Result, State1} = notify_queues(State),
case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of
@@ -920,11 +955,12 @@ precondition_failed(Format, Params) ->
rabbit_misc:protocol_error(precondition_failed, Format, Params).
return_queue_declare_ok(#resource{name = ActualName},
- NoWait, MessageCount, ConsumerCount, State) ->
- return_ok(State#ch{most_recently_declared_queue = ActualName}, NoWait,
- #'queue.declare_ok'{queue = ActualName,
- message_count = MessageCount,
- consumer_count = ConsumerCount}).
+ NoWait, MessageCount, ConsumerCount,
+ #ch{cfg = Cfg} = State) ->
+ return_ok(State#ch{cfg = Cfg#conf{most_recently_declared_queue = ActualName}},
+ NoWait, #'queue.declare_ok'{queue = ActualName,
+ message_count = MessageCount,
+ consumer_count = ConsumerCount}).
check_resource_access(User, Resource, Perm) ->
V = {Resource, Perm},
@@ -962,15 +998,15 @@ check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) ->
check_user_id_header(#'P_basic'{user_id = undefined}, _) ->
ok;
check_user_id_header(#'P_basic'{user_id = Username},
- #ch{user = #user{username = Username}}) ->
+ #ch{cfg = #conf{user = #user{username = Username}}}) ->
ok;
check_user_id_header(
- #'P_basic'{}, #ch{user = #user{authz_backends =
- [{rabbit_auth_backend_dummy, _}]}}) ->
+ #'P_basic'{}, #ch{cfg = #conf{user = #user{authz_backends =
+ [{rabbit_auth_backend_dummy, _}]}}}) ->
ok;
check_user_id_header(#'P_basic'{user_id = Claimed},
- #ch{user = #user{username = Actual,
- tags = Tags}}) ->
+ #ch{cfg = #conf{user = #user{username = Actual,
+ tags = Tags}}}) ->
case lists:member(impersonator, Tags) of
true -> ok;
false -> precondition_failed(
@@ -1079,18 +1115,18 @@ qbin_to_resource(QueueNameBin, VHostPath) ->
name_to_resource(Type, NameBin, VHostPath) ->
rabbit_misc:r(VHostPath, Type, NameBin).
-expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) ->
+expand_queue_name_shortcut(<<>>, #ch{cfg = #conf{most_recently_declared_queue = <<>>}}) ->
rabbit_misc:protocol_error(not_found, "no previously declared queue", []);
-expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) ->
+expand_queue_name_shortcut(<<>>, #ch{cfg = #conf{most_recently_declared_queue = MRDQ}}) ->
MRDQ;
expand_queue_name_shortcut(QueueNameBin, _) ->
QueueNameBin.
expand_routing_key_shortcut(<<>>, <<>>,
- #ch{most_recently_declared_queue = <<>>}) ->
+ #ch{cfg = #conf{most_recently_declared_queue = <<>>}}) ->
rabbit_misc:protocol_error(not_found, "no previously declared queue", []);
expand_routing_key_shortcut(<<>>, <<>>,
- #ch{most_recently_declared_queue = MRDQ}) ->
+ #ch{cfg = #conf{most_recently_declared_queue = MRDQ}}) ->
MRDQ;
expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) ->
RoutingKey.
@@ -1178,9 +1214,10 @@ record_confirms(MXs, State = #ch{confirmed = C}) ->
handle_method({Method, Content}, State) ->
handle_method(Method, Content, State).
-handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
+handle_method(#'channel.open'{}, _,
+ State = #ch{cfg = #conf{state = starting} = Cfg}) ->
%% Don't leave "starting" as the state for 5s. TODO is this TRTTD?
- State1 = State#ch{state = running},
+ State1 = State#ch{cfg = Cfg#conf{state = running}},
rabbit_event:if_enabled(State1, #ch.stats_timer,
fun() -> emit_stats(State1) end),
{reply, #'channel.open_ok'{}, State1};
@@ -1189,21 +1226,23 @@ handle_method(#'channel.open'{}, _, _State) ->
rabbit_misc:protocol_error(
channel_error, "second 'channel.open' seen", []);
-handle_method(_Method, _, #ch{state = starting}) ->
+handle_method(_Method, _, #ch{cfg = #conf{state = starting}}) ->
rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []);
-handle_method(#'channel.close_ok'{}, _, #ch{state = closing}) ->
+handle_method(#'channel.close_ok'{}, _, #ch{cfg = #conf{state = closing}}) ->
stop;
-handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid,
- state = closing}) ->
+handle_method(#'channel.close'{}, _,
+ State = #ch{cfg = #conf{state = closing,
+ writer_pid = WriterPid}}) ->
ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
{noreply, State};
-handle_method(_Method, _, State = #ch{state = closing}) ->
+handle_method(_Method, _, State = #ch{cfg = #conf{state = closing}}) ->
{noreply, State};
-handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
+handle_method(#'channel.close'{}, _,
+ State = #ch{cfg = #conf{reader_pid = ReaderPid}}) ->
{_Result, State1} = notify_queues(State),
%% We issue the channel.close_ok response after a handshake with
%% the reader, the other half of which is ready_for_close. That
@@ -1235,17 +1274,19 @@ handle_method(#'basic.publish'{immediate = true}, _Content, _State) ->
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory},
- Content, State = #ch{virtual_host = VHostPath,
+ Content, State = #ch{cfg = #conf{channel = ChannelNum,
+ conn_pid = ConnPid,
+ source = ChSrc,
+ conn_name = ConnName,
+ virtual_host = VHostPath,
+ user = #user{username = Username} = User,
+ trace_state = TraceState,
+ max_message_size = MaxMessageSize
+ },
tx = Tx,
- channel = ChannelNum,
confirm_enabled = ConfirmEnabled,
- trace_state = TraceState,
- user = #user{username = Username} = User,
- conn_name = ConnName,
- delivery_flow = Flow,
- conn_pid = ConnPid,
- source = ChSrc,
- max_message_size = MaxMessageSize}) ->
+ delivery_flow = Flow
+ }) ->
check_msg_size(Content, MaxMessageSize),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, User),
@@ -1300,12 +1341,13 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
end};
handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
- _, State = #ch{writer_pid = WriterPid,
- conn_pid = ConnPid,
- limiter = Limiter,
+ _, State = #ch{cfg = #conf{writer_pid = WriterPid,
+ conn_pid = ConnPid,
+ user = User,
+ virtual_host = VHostPath
+ },
+ limiter = Limiter,
next_tag = DeliveryTag,
- user = User,
- virtual_host = VHostPath,
queue_states = QueueStates0}) ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
check_read_permitted(QueueName, User),
@@ -1385,10 +1427,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
exclusive = ExclusiveConsume,
nowait = NoWait,
arguments = Args},
- _, State = #ch{consumer_prefetch = ConsumerPrefetch,
- consumer_mapping = ConsumerMapping,
- user = User,
- virtual_host = VHostPath}) ->
+ _, State = #ch{cfg = #conf{consumer_prefetch = ConsumerPrefetch,
+ user = User,
+ virtual_host = VHostPath},
+ consumer_mapping = ConsumerMapping
+ }) ->
case maps:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
@@ -1420,9 +1463,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
end;
handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
- _, State = #ch{consumer_mapping = ConsumerMapping,
+ _, State = #ch{cfg = #conf{user = #user{username = Username}},
+ consumer_mapping = ConsumerMapping,
queue_consumers = QCons,
- user = #user{username = Username},
queue_states = QueueStates0}) ->
OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
case maps:find(ConsumerTag, ConsumerMapping) of
@@ -1470,10 +1513,11 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
handle_method(#'basic.qos'{global = false,
prefetch_count = PrefetchCount},
- _, State = #ch{limiter = Limiter}) ->
+ _, State = #ch{cfg = Cfg,
+ limiter = Limiter}) ->
%% Ensures that if default was set, it's overridden
Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
- {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount,
+ {reply, #'basic.qos_ok'{}, State#ch{cfg = Cfg#conf{consumer_prefetch = PrefetchCount},
limiter = Limiter1}};
handle_method(#'basic.qos'{global = true,
@@ -1531,87 +1575,87 @@ handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue},
reject(DeliveryTag, Requeue, false, State);
handle_method(#'exchange.declare'{nowait = NoWait} = Method,
- _, State = #ch{virtual_host = VHostPath,
- user = User,
- queue_collector_pid = CollectorPid,
- conn_pid = ConnPid,
- source = ChSrc}) ->
+ _, State = #ch{cfg = #conf{virtual_host = VHostPath,
+ user = User,
+ queue_collector_pid = CollectorPid,
+ conn_pid = ConnPid,
+ source = ChSrc}}) ->
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.delete'{nowait = NoWait} = Method,
- _, State = #ch{conn_pid = ConnPid,
- source = ChSrc,
- virtual_host = VHostPath,
- queue_collector_pid = CollectorPid,
- user = User}) ->
+ _, State = #ch{cfg = #conf{conn_pid = ConnPid,
+ source = ChSrc,
+ virtual_host = VHostPath,
+ queue_collector_pid = CollectorPid,
+ user = User}}) ->
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.delete_ok'{});
handle_method(#'exchange.bind'{nowait = NoWait} = Method,
- _, State = #ch{virtual_host = VHostPath,
- conn_pid = ConnPid,
- source = ChSrc,
- queue_collector_pid = CollectorPid,
- user = User}) ->
+ _, State = #ch{cfg = #conf{virtual_host = VHostPath,
+ conn_pid = ConnPid,
+ source = ChSrc,
+ queue_collector_pid = CollectorPid,
+ user = User}}) ->
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.bind_ok'{});
handle_method(#'exchange.unbind'{nowait = NoWait} = Method,
- _, State = #ch{virtual_host = VHostPath,
- conn_pid = ConnPid,
- source = ChSrc,
- queue_collector_pid = CollectorPid,
- user = User}) ->
+ _, State = #ch{cfg = #conf{virtual_host = VHostPath,
+ conn_pid = ConnPid,
+ source = ChSrc,
+ queue_collector_pid = CollectorPid,
+ user = User}}) ->
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'exchange.unbind_ok'{});
handle_method(#'queue.declare'{nowait = NoWait} = Method,
- _, State = #ch{virtual_host = VHostPath,
- conn_pid = ConnPid,
- source = ChSrc,
- queue_collector_pid = CollectorPid,
- user = User}) ->
+ _, State = #ch{cfg = #conf{virtual_host = VHostPath,
+ conn_pid = ConnPid,
+ source = ChSrc,
+ queue_collector_pid = CollectorPid,
+ user = User}}) ->
{ok, QueueName, MessageCount, ConsumerCount} =
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_queue_declare_ok(QueueName, NoWait, MessageCount,
ConsumerCount, State);
handle_method(#'queue.delete'{nowait = NoWait} = Method, _,
- State = #ch{conn_pid = ConnPid,
- source = ChSrc,
- virtual_host = VHostPath,
- queue_collector_pid = CollectorPid,
- user = User}) ->
+ State = #ch{cfg = #conf{conn_pid = ConnPid,
+ source = ChSrc,
+ virtual_host = VHostPath,
+ queue_collector_pid = CollectorPid,
+ user = User}}) ->
{ok, PurgedMessageCount} =
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait,
#'queue.delete_ok'{message_count = PurgedMessageCount});
handle_method(#'queue.bind'{nowait = NoWait} = Method, _,
- State = #ch{conn_pid = ConnPid,
- source = ChSrc,
- user = User,
- queue_collector_pid = CollectorPid,
- virtual_host = VHostPath}) ->
+ State = #ch{cfg = #conf{conn_pid = ConnPid,
+ source = ChSrc,
+ user = User,
+ queue_collector_pid = CollectorPid,
+ virtual_host = VHostPath}}) ->
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, NoWait, #'queue.bind_ok'{});
handle_method(#'queue.unbind'{} = Method, _,
- State = #ch{conn_pid = ConnPid,
- source = ChSrc,
- user = User,
- queue_collector_pid = CollectorPid,
- virtual_host = VHostPath}) ->
+ State = #ch{cfg = #conf{conn_pid = ConnPid,
+ source = ChSrc,
+ user = User,
+ queue_collector_pid = CollectorPid,
+ virtual_host = VHostPath}}) ->
handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User),
return_ok(State, false, #'queue.unbind_ok'{});
handle_method(#'queue.purge'{nowait = NoWait} = Method,
- _, State = #ch{conn_pid = ConnPid,
- source = ChSrc,
- user = User,
- queue_collector_pid = CollectorPid,
- virtual_host = VHostPath}) ->
+ _, State = #ch{cfg = #conf{conn_pid = ConnPid,
+ source = ChSrc,
+ user = User,
+ queue_collector_pid = CollectorPid,
+ virtual_host = VHostPath}}) ->
case handle_method(Method, ConnPid, ChSrc, CollectorPid,
VHostPath, User) of
{ok, PurgedMessageCount} ->
@@ -1690,11 +1734,11 @@ handle_method(_MethodRecord, _Content, _State) ->
%% for why.
basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
ExclusiveConsume, Args, NoWait,
- State = #ch{conn_pid = ConnPid,
- limiter = Limiter,
+ State = #ch{cfg = #conf{conn_pid = ConnPid,
+ user = #user{username = Username}},
+ limiter = Limiter,
consumer_mapping = ConsumerMapping,
- user = #user{username = Username},
- queue_states = QueueStates0}) ->
+ queue_states = QueueStates0}) ->
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) ->
@@ -1814,8 +1858,9 @@ handle_consuming_queue_down_or_eol(QRef,
%% not an HA failover. But the likelihood is not great and most users
%% are unlikely to care.
-cancel_consumer(CTag, QName, State = #ch{capabilities = Capabilities,
- consumer_mapping = CMap}) ->
+cancel_consumer(CTag, QName,
+ State = #ch{cfg = #conf{capabilities = Capabilities},
+ consumer_mapping = CMap}) ->
case rabbit_misc:table_lookup(
Capabilities, <<"consumer_cancel_notify">>) of
{bool, true} -> ok = send(#'basic.cancel'{consumer_tag = CTag,
@@ -1885,7 +1930,8 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0,
basic_return(#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content},
- State = #ch{protocol = Protocol, writer_pid = WriterPid},
+ State = #ch{cfg = #conf{protocol = Protocol,
+ writer_pid = WriterPid}},
Reason) ->
?INCR_STATS(exchange_stats, ExchangeName, 1, return_unroutable, State),
{_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason),
@@ -1920,12 +1966,14 @@ internal_reject(Requeue, Acked, Limiter,
record_sent(Type, Tag, AckRequired,
Msg = {QName, QPid, MsgId, Redelivered, _Message},
- State = #ch{unacked_message_q = UAMQ,
- next_tag = DeliveryTag,
- trace_state = TraceState,
- user = #user{username = Username},
- conn_name = ConnName,
- channel = ChannelNum}) ->
+ State = #ch{cfg = #conf{channel = ChannelNum,
+ trace_state = TraceState,
+ user = #user{username = Username},
+ conn_name = ConnName
+ },
+ unacked_message_q = UAMQ,
+ next_tag = DeliveryTag
+ }) ->
?INCR_STATS(queue_stats, QName, 1, case {Type, AckRequired} of
{get, true} -> get;
{get, false} -> get_no_ack;
@@ -1936,10 +1984,11 @@ record_sent(Type, Tag, AckRequired,
true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State);
false -> ok
end,
+ DeliveredAt = os:system_time(millisecond),
rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState),
UAMQ1 = case AckRequired of
- true -> ?QUEUE:in({DeliveryTag, Tag, {QPid, MsgId}},
- UAMQ);
+ true -> ?QUEUE:in({DeliveryTag, Tag, DeliveredAt,
+ {QPid, MsgId}}, UAMQ);
false -> UAMQ
end,
State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
@@ -1952,7 +2001,7 @@ collect_acks(Q, DeliveryTag, Multiple) ->
collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
case ?QUEUE:out(Q) of
- {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
+ {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Time, _Msg}},
QTail} ->
if CurrentDeliveryTag == DeliveryTag ->
{[UnackedMsg | ToAcc],
@@ -2007,9 +2056,10 @@ incr_queue_stats(QPid, QNames, MsgIds, State) ->
%% ack first" order.
new_tx() -> {?QUEUE:new(), []}.
-notify_queues(State = #ch{state = closing}) ->
+notify_queues(State = #ch{cfg = #conf{state = closing}}) ->
{ok, State};
notify_queues(State = #ch{consumer_mapping = Consumers,
+ cfg = Cfg,
delivering_queues = DQ }) ->
QRefs0 = sets:to_list(
sets:union(sets:from_list(consumer_queue_refs(Consumers)), DQ)),
@@ -2017,15 +2067,15 @@ notify_queues(State = #ch{consumer_mapping = Consumers,
QPids = [P || P <- QRefs0, ?IS_CLASSIC(P)],
Timeout = get_operation_timeout(),
{rabbit_amqqueue:notify_down_all(QPids, self(), Timeout),
- State#ch{state = closing}}.
+ State#ch{cfg = Cfg#conf{state = closing}}}.
foreach_per_queue(_F, [], Acc) ->
Acc;
-foreach_per_queue(F, [{_DTag, CTag, {QPid, MsgId}}], Acc) ->
+foreach_per_queue(F, [{_DTag, CTag, _Time, {QPid, MsgId}}], Acc) ->
%% quorum queue, needs the consumer tag
F({QPid, CTag}, [MsgId], Acc);
foreach_per_queue(F, UAL, Acc) ->
- T = lists:foldl(fun ({_DTag, CTag, {QPid, MsgId}}, T) ->
+ T = lists:foldl(fun ({_DTag, CTag, _Time, {QPid, MsgId}}, T) ->
rabbit_misc:gb_trees_cons({QPid, CTag}, MsgId, T)
end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end, Acc, T).
@@ -2061,8 +2111,9 @@ notify_limiter(Limiter, Acked) ->
deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
confirm = false,
mandatory = false},
- []}, State) -> %% optimisation
+ _RoutedToQs = []}, State) -> %% optimisation
?INCR_STATS(exchange_stats, XName, 1, publish, State),
+ ?INCR_STATS(exchange_stats, XName, 1, drop_unroutable, State),
State;
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
@@ -2115,9 +2166,16 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
end,
State2#ch{queue_states = QueueStates}.
-process_routing_mandatory(true, [], Msg, State) ->
+process_routing_mandatory(_Mandatory = true,
+ _RoutedToQs = [],
+ Msg, State) ->
ok = basic_return(Msg, State, no_route),
ok;
+process_routing_mandatory(_Mandatory = false,
+ _RoutedToQs = [],
+ #basic_message{exchange_name = ExchangeName}, State) ->
+ ?INCR_STATS(exchange_stats, ExchangeName, 1, drop_unroutable, State),
+ ok;
process_routing_mandatory(_, _, _, _) ->
ok.
@@ -2167,7 +2225,7 @@ send_confirms_and_nacks(State) ->
send_nacks([], _, State) ->
State;
-send_nacks(_Rs, _, State = #ch{state = closing}) -> %% optimisation
+send_nacks(_Rs, _, State = #ch{cfg = #conf{state = closing}}) -> %% optimisation
State;
send_nacks(Rs, Cs, State) ->
coalesce_and_send(Rs, Cs,
@@ -2178,7 +2236,7 @@ send_nacks(Rs, Cs, State) ->
send_confirms([], _, State) ->
State;
-send_confirms(_Cs, _, State = #ch{state = closing}) -> %% optimisation
+send_confirms(_Cs, _, State = #ch{cfg = #conf{state = closing}}) -> %% optimisation
State;
send_confirms([MsgSeqNo], _, State) ->
ok = send(#'basic.ack'{delivery_tag = MsgSeqNo}, State),
@@ -2243,14 +2301,14 @@ infos(Items, Deadline, State) ->
end || Item <- Items].
i(pid, _) -> self();
-i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
-i(number, #ch{channel = Channel}) -> Channel;
-i(user, #ch{user = User}) -> User#user.username;
+i(connection, #ch{cfg = #conf{conn_pid = ConnPid}}) -> ConnPid;
+i(number, #ch{cfg = #conf{channel = Channel}}) -> Channel;
+i(user, #ch{cfg = #conf{user = User}}) -> User#user.username;
i(user_who_performed_action, Ch) -> i(user, Ch);
-i(vhost, #ch{virtual_host = VHost}) -> VHost;
+i(vhost, #ch{cfg = #conf{virtual_host = VHost}}) -> VHost;
i(transactional, #ch{tx = Tx}) -> Tx =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
-i(source, #ch{source = ChSrc}) -> ChSrc;
+i(source, #ch{cfg = #conf{source = ChSrc}}) -> ChSrc;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM);
i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
@@ -2259,9 +2317,9 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> ?QUEUE:len(Msgs);
i(messages_uncommitted, #ch{}) -> 0;
i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
-i(state, #ch{state = running}) -> credit_flow:state();
-i(state, #ch{state = State}) -> State;
-i(prefetch_count, #ch{consumer_prefetch = C}) -> C;
+i(state, #ch{cfg = #conf{state = running}}) -> credit_flow:state();
+i(state, #ch{cfg = #conf{state = State}}) -> State;
+i(prefetch_count, #ch{cfg = #conf{consumer_prefetch = C}}) -> C;
i(global_prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_prefetch_limit(Limiter);
i(interceptors, #ch{interceptor_state = IState}) ->
@@ -2274,7 +2332,7 @@ i(reductions, _State) ->
i(Item, _) ->
throw({bad_argument, Item}).
-name(#ch{conn_name = ConnName, channel = Channel}) ->
+name(#ch{cfg = #conf{conn_name = ConnName, channel = Channel}}) ->
list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])).
emit_stats(State) -> emit_stats(State, []).
@@ -2297,9 +2355,9 @@ erase_queue_stats(QName) ->
end || {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(),
QName0 =:= QName].
-get_vhost(#ch{virtual_host = VHost}) -> VHost.
+get_vhost(#ch{cfg = #conf{virtual_host = VHost}}) -> VHost.
-get_user(#ch{user = User}) -> User.
+get_user(#ch{cfg = #conf{user = User}}) -> User.
delete_stats({queue_stats, QName}) ->
rabbit_core_metrics:channel_queue_down({self(), QName});
@@ -2559,7 +2617,7 @@ handle_deliver(ConsumerTag, AckRequired,
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}},
- State = #ch{writer_pid = WriterPid,
+ State = #ch{cfg = #conf{writer_pid = WriterPid},
next_tag = DeliveryTag}) ->
Deliver = #'basic.deliver'{consumer_tag = ConsumerTag,
delivery_tag = DeliveryTag,
@@ -2592,9 +2650,29 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
State1 = track_delivering_queue(NoAck, QPid, QName, State),
{noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State1)}.
-init_queue_cleanup_timer(State) ->
- {ok, Interval} = application:get_env(rabbit, channel_queue_cleanup_interval),
- State#ch{queue_cleanup_timer = erlang:send_after(Interval, self(), queue_cleanup)}.
+init_tick_timer(State = #ch{tick_timer = undefined}) ->
+ {ok, Interval} = application:get_env(rabbit, channel_tick_interval),
+ State#ch{tick_timer = erlang:send_after(Interval, self(), tick)};
+init_tick_timer(State) ->
+ State.
+
+reset_tick_timer(State) ->
+ State#ch{tick_timer = undefined}.
+
+maybe_cancel_tick_timer(#ch{tick_timer = undefined} = State) ->
+ State;
+maybe_cancel_tick_timer(#ch{tick_timer = TRef,
+ unacked_message_q = UMQ} = State) ->
+ case ?QUEUE:len(UMQ) of
+ 0 ->
+ %% we can only cancel the tick timer if the unacked messages
+ %% queue is empty.
+ _ = erlang:cancel_timer(TRef),
+ State#ch{tick_timer = undefined};
+ _ ->
+ %% let the timer continue
+ State
+ end.
%% only classic queues need monitoring so rather than special casing
%% everywhere monitors are set up we wrap it here for this module
@@ -2612,7 +2690,7 @@ add_delivery_count_header(#{delivery_count := Count}, Msg) ->
add_delivery_count_header(_, Msg) ->
Msg.
-qpid_to_ref(Pid) when is_pid(Pid) -> Pid;
+qpid_to_ref(Pid) when is_pid(Pid) -> Pid;
qpid_to_ref({Name, _}) -> Name;
%% assume it already is a ref
qpid_to_ref(Ref) -> Ref.
@@ -2632,3 +2710,60 @@ queue_fold(Fun, Init, Q) ->
{empty, _Q} -> Init;
{{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1)
end.
+
+evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel,
+ capabilities = Capabilities,
+ consumer_timeout = Timeout},
+ queue_names = QNames,
+ queue_consumers = QCons,
+ unacked_message_q = UAMQ}) ->
+ Now = os:system_time(millisecond),
+ case ?QUEUE:peek(UAMQ) of
+ {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}}
+ when is_integer(Timeout)
+ andalso Time < Now - Timeout ->
+ rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out "
+ "waiting on ack",
+ [rabbit_data_coercion:to_binary(ConsumerTag),
+ Channel]),
+ SupportsCancel = case rabbit_misc:table_lookup(
+ Capabilities,
+ <<"consumer_cancel_notify">>) of
+ {bool, true} when is_binary(ConsumerTag) ->
+ true;
+ _ -> false
+ end,
+ case SupportsCancel of
+ false ->
+ Ex = rabbit_misc:amqp_error(precondition_failed,
+ "consumer ack timed out on channel ~w",
+ [Channel], none),
+ handle_exception(Ex, State0);
+ true ->
+ QRef = qpid_to_ref(QPid),
+ QName = maps:get(QRef, QNames),
+ %% cancel the consumer with the client
+ State2 = cancel_consumer(ConsumerTag, QName, State0),
+ [Q] = rabbit_amqqueue:lookup([QName]),
+ %% send basic cancel to the queue
+ {ok, QueueStates2} = rabbit_amqqueue:basic_cancel(
+ Q, self(), ConsumerTag, undefined,
+ <<"broker">>, State2#ch.queue_states),
+ %% return all in-flight messages for the consumer
+ {MsgIds, Rem} = lists:foldl(
+ fun({_DelTag, ConTag, _Time, {_, MsgId}},
+ {Ids, Rem}) when ConTag == ConsumerTag ->
+ {[MsgId | Ids], Rem};
+ (Unacked, {Ids, Rem}) ->
+ {Ids, ?QUEUE:in(Unacked, Rem)}
+ end, {[], ?QUEUE:new()},
+ ?QUEUE:to_list(UAMQ)),
+ QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds},
+ self(), QueueStates2),
+ {noreply, State2#ch{queue_states = QueueStates,
+ queue_consumers = maps:remove(QRef, QCons),
+ unacked_message_q = Rem}}
+ end;
+ _ ->
+ {noreply, State0}
+ end.
diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl
index f4980aec7d..99ad8eef34 100644
--- a/src/rabbit_core_metrics_gc.erl
+++ b/src/rabbit_core_metrics_gc.erl
@@ -157,7 +157,7 @@ gc_process_and_entity(Table, GbSet) ->
ets:foldl(fun({{Pid, Id} = Key, _, _, _, _, _, _, _, _}, none)
when Table == channel_queue_metrics ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
- ({{Pid, Id} = Key, _, _, _, _}, none)
+ ({{Pid, Id} = Key, _, _, _, _, _}, none)
when Table == channel_exchange_metrics ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
({{Id, Pid, _} = Key, _, _, _, _, _, _}, none)
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index 7368f2b8a2..e20a8f2f72 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -20,13 +20,72 @@
-export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]).
-export([peer_cert_subject_items/2, peer_cert_auth_name/1]).
+-export([cipher_suites_erlang/2, cipher_suites_erlang/1,
+ cipher_suites_openssl/2, cipher_suites_openssl/1,
+ cipher_suites/1]).
%%--------------------------------------------------------------------------
-export_type([certificate/0]).
+% Due to API differences between OTP releases.
+-dialyzer(no_missing_calls).
+-ignore_xref([{ssl_cipher_format, erl_suite_definition, 1},
+ {ssl_cipher_format, suite, 1},
+ {ssl_cipher_format, openssl_suite_name, 1}]).
+
-type certificate() :: rabbit_cert_info:certificate().
+-type cipher_suites_mode() :: default | all | anonymous.
+
+-spec cipher_suites(cipher_suites_mode()) -> ssl:ciphers().
+cipher_suites(Mode) ->
+ Version = get_highest_protocol_version(),
+ ssl:cipher_suites(Mode, Version).
+
+-spec cipher_suites_erlang(cipher_suites_mode()) ->
+ [ssl:old_cipher_suite()].
+cipher_suites_erlang(Mode) ->
+ Version = get_highest_protocol_version(),
+ cipher_suites_erlang(Mode, Version).
+
+-spec cipher_suites_erlang(cipher_suites_mode(),
+ ssl:protocol_version() | tls_record:tls_version()) ->
+ [ssl:old_cipher_suite()].
+cipher_suites_erlang(Mode, Version) ->
+ [ format_cipher_erlang(C)
+ || C <- ssl:cipher_suites(Mode, Version) ].
+
+-spec cipher_suites_openssl(cipher_suites_mode()) ->
+ [ssl:old_cipher_suite()].
+cipher_suites_openssl(Mode) ->
+ Version = get_highest_protocol_version(),
+ cipher_suites_openssl(Mode, Version).
+
+-spec cipher_suites_openssl(cipher_suites_mode(),
+ ssl:protocol_version() | tls_record:tls_version()) ->
+ [ssl:old_cipher_suite()].
+cipher_suites_openssl(Mode, Version) ->
+ lists:filtermap(fun(C) ->
+ OpenSSL = format_cipher_openssl(C),
+ case is_list(OpenSSL) of
+ true -> {true, OpenSSL};
+ false -> false
+ end
+ end,
+ ssl:cipher_suites(Mode, Version)).
+
+
+format_cipher_erlang(Cipher) ->
+ ssl_cipher_format:erl_suite_definition(ssl_cipher_format:suite(Cipher)).
+
+format_cipher_openssl(Cipher) ->
+ ssl_cipher_format:openssl_suite_name(ssl_cipher_format:suite(Cipher)).
+
+-spec get_highest_protocol_version() -> tls_record:tls_version().
+get_highest_protocol_version() ->
+ tls_record:highest_protocol_version([]).
+
%%--------------------------------------------------------------------------
%% High-level functions used by reader
%%--------------------------------------------------------------------------
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 702d96c7dc..64abbe56ff 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -37,6 +37,7 @@ memory() ->
MnesiaETS = mnesia_memory(),
MsgIndexETS = ets_memory(msg_stores()),
MetricsETS = ets_memory([rabbit_metrics]),
+ QuorumETS = ets_memory([ra_log_ets]),
MetricsProc = try
[{_, M}] = process_info(whereis(rabbit_metrics), [memory]),
M
@@ -87,7 +88,8 @@ memory() ->
%% ETS
{mnesia, MnesiaETS},
- {other_ets, ETS - MnesiaETS - MetricsETS - MgmtDbETS - MsgIndexETS},
+ {quorum_ets, QuorumETS},
+ {other_ets, ETS - MnesiaETS - MetricsETS - MgmtDbETS - MsgIndexETS - QuorumETS},
%% Messages (mostly, some binaries are not messages)
{binary, Bin},
diff --git a/test/consumer_timeout_SUITE.erl b/test/consumer_timeout_SUITE.erl
new file mode 100644
index 0000000000..8817b93c03
--- /dev/null
+++ b/test/consumer_timeout_SUITE.erl
@@ -0,0 +1,272 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2019 Pivotal Software, Inc. All rights reserved.
+%%
+%%
+-module(consumer_timeout_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("kernel/include/file.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+-define(TIMEOUT, 30000).
+
+-import(quorum_queue_utils, [wait_for_messages/2]).
+
+all() ->
+ [
+ {group, parallel_tests}
+ ].
+
+groups() ->
+ AllTests = [consumer_timeout,
+ consumer_timeout_basic_get,
+ consumer_timeout_no_basic_cancel_capability
+ ],
+ [
+ {parallel_tests, [],
+ [
+ {classic_queue, [parallel], AllTests},
+ {mirrored_queue, [parallel], AllTests},
+ {quorum_queue, [parallel], AllTests}
+ ]}
+ ].
+
+suite() ->
+ [
+ {timetrap, {minutes, 3}}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(classic_queue, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, true}]);
+init_per_group(quorum_queue, Config) ->
+ case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of
+ ok ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ {queue_durable, true}]);
+ Skip ->
+ Skip
+ end;
+init_per_group(mirrored_queue, Config) ->
+ rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
+ <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config, [{is_mirrored, true},
+ {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, true}]),
+ rabbit_ct_helpers:run_steps(Config1, []);
+init_per_group(Group, Config0) ->
+ case lists:member({group, Group}, all()) of
+ true ->
+ ClusterSize = 2,
+ Config = rabbit_ct_helpers:merge_app_env(
+ Config0, {rabbit, [{channel_tick_interval, 1000},
+ {quorum_tick_interval, 1000},
+ {consumer_timeout, 5000}]}),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config, [ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, ClusterSize}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
+ false ->
+ rabbit_ct_helpers:run_steps(Config0, [])
+ end.
+
+end_per_group(Group, Config) ->
+ case lists:member({group, Group}, all()) of
+ true ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps());
+ false ->
+ Config
+ end.
+
+init_per_testcase(Testcase, Config) ->
+ Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
+ Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
+ Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q},
+ {queue_name_2, Q2}]),
+ rabbit_ct_helpers:testcase_started(Config1, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_2, Config)}),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+consumer_timeout(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive
+ {#'basic.deliver'{delivery_tag = _,
+ redelivered = false}, _} ->
+ %% do nothing with the delivery should trigger timeout
+ receive
+ #'basic.cancel'{ } ->
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ ok
+ after 20000 ->
+ flush(1),
+ exit(cancel_never_happened)
+ end
+ after 5000 ->
+ exit(deliver_timeout)
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
+
+consumer_timeout_basic_get(Config) ->
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [_DelTag] = consume(Ch, QName, [<<"msg1">>]),
+ erlang:monitor(process, Conn),
+ erlang:monitor(process, Ch),
+ receive
+ {'DOWN', _, process, Ch, _} -> ok
+ after 30000 ->
+ flush(1),
+ exit(channel_exit_expected)
+ end,
+ receive
+ {'DOWN', _, process, Conn, _} ->
+ flush(1),
+ exit(unexpected_connection_exit)
+ after 2000 ->
+ ok
+ end,
+ ok.
+
+
+-define(CLIENT_CAPABILITIES,
+ [{<<"publisher_confirms">>, bool, true},
+ {<<"exchange_exchange_bindings">>, bool, true},
+ {<<"basic.nack">>, bool, true},
+ {<<"consumer_cancel_notify">>, bool, false},
+ {<<"connection.blocked">>, bool, true},
+ {<<"authentication_failure_close">>, bool, true}]).
+
+consumer_timeout_no_basic_cancel_capability(Config) ->
+ Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
+ Props = [{<<"capabilities">>, table, ?CLIENT_CAPABILITIES}],
+ AmqpParams = #amqp_params_network{port = Port,
+ host = "localhost",
+ virtual_host = <<"/">>,
+ client_properties = Props
+ },
+ {ok, Conn} = amqp_connection:start(AmqpParams),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ erlang:monitor(process, Conn),
+ erlang:monitor(process, Ch),
+ subscribe(Ch, QName, false),
+ receive
+ {#'basic.deliver'{delivery_tag = _,
+ redelivered = false}, _} ->
+ %% do nothing with the delivery should trigger timeout
+ ok
+ after 5000 ->
+ exit(deliver_timeout)
+ end,
+ receive
+ {'DOWN', _, process, Ch, _} -> ok
+ after 30000 ->
+ flush(1),
+ exit(channel_exit_expected)
+ end,
+ receive
+ {'DOWN', _, process, Conn, _} ->
+ flush(1),
+ exit(unexpected_connection_exit)
+ after 2000 ->
+ ok
+ end,
+ ok.
+%%%%%%%%%%%%%%%%%%%%%%%%
+%% Test helpers
+%%%%%%%%%%%%%%%%%%%%%%%%
+
+declare_queue(Ch, Config, QName) ->
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ arguments = Args,
+ durable = Durable}).
+publish(Ch, QName, Payloads) ->
+ [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload})
+ || Payload <- Payloads].
+
+consume(Ch, QName, Payloads) ->
+ consume(Ch, QName, false, Payloads).
+
+consume(Ch, QName, NoAck, Payloads) ->
+ [begin
+ {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QName,
+ no_ack = NoAck}),
+ DTag
+ end || Payload <- Payloads].
+
+subscribe(Ch, Queue, NoAck) ->
+ subscribe(Ch, Queue, NoAck, <<"ctag">>).
+
+subscribe(Ch, Queue, NoAck, Ctag) ->
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
+ no_ack = NoAck,
+ consumer_tag = Ctag},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = Ctag} ->
+ ok
+ end.
+
+flush(T) ->
+ receive X ->
+ ct:pal("flushed ~w", [X]),
+ flush(T)
+ after T ->
+ ok
+ end.
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl
index eba0965608..632a314d21 100644
--- a/test/queue_parallel_SUITE.erl
+++ b/test/queue_parallel_SUITE.erl
@@ -58,7 +58,7 @@ groups() ->
delete_immediately_by_resource
],
[
- {parallel_tests, [],
+ {parallel_tests, [],
[
{classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
{mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
@@ -129,19 +129,22 @@ init_per_group(mirrored_queue, Config) ->
{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
{queue_durable, true}]),
rabbit_ct_helpers:run_steps(Config1, []);
-init_per_group(Group, Config) ->
+init_per_group(Group, Config0) ->
case lists:member({group, Group}, all()) of
true ->
ClusterSize = 2,
- Config1 = rabbit_ct_helpers:set_config(Config, [
- {rmq_nodename_suffix, Group},
- {rmq_nodes_count, ClusterSize}
- ]),
+ Config = rabbit_ct_helpers:merge_app_env(
+ Config0, {rabbit, [{channel_tick_interval, 1000},
+ {quorum_tick_interval, 1000}]}),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config, [ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, ClusterSize}
+ ]),
rabbit_ct_helpers:run_steps(Config1,
- rabbit_ct_broker_helpers:setup_steps() ++
- rabbit_ct_client_helpers:setup_steps());
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
false ->
- rabbit_ct_helpers:run_steps(Config, [])
+ rabbit_ct_helpers:run_steps(Config0, [])
end.
end_per_group(Group, Config) ->
@@ -193,7 +196,7 @@ consume_first_empty(Config) ->
consume_empty(Ch, QName),
publish(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
- consume(Ch, QName, [<<"msg1">>]),
+ consume(Ch, QName, true, [<<"msg1">>]),
rabbit_ct_client_helpers:close_channel(Ch).
consume_from_empty_queue(Config) ->
@@ -268,7 +271,9 @@ consume_and_ack(Config) ->
[DeliveryTag] = consume(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
- wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
consume_and_multiple_ack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -281,7 +286,9 @@ consume_and_multiple_ack(Config) ->
wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = true}),
- wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
subscribe_and_ack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -296,7 +303,9 @@ subscribe_and_ack(Config) ->
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
subscribe_and_multiple_ack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -314,7 +323,9 @@ subscribe_and_multiple_ack(Config) ->
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
multiple = true}),
wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
subscribe_and_requeue_multiple_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -343,7 +354,9 @@ subscribe_and_requeue_multiple_nack(Config) ->
multiple = true}),
wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
end
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
consume_and_requeue_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -357,7 +370,9 @@ consume_and_requeue_nack(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = true}),
- wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
consume_and_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -371,7 +386,9 @@ consume_and_nack(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = false}),
- wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
consume_and_requeue_multiple_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -385,7 +402,9 @@ consume_and_requeue_multiple_nack(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = true,
requeue = true}),
- wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
consume_and_multiple_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -399,7 +418,9 @@ consume_and_multiple_nack(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = true,
requeue = false}),
- wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
subscribe_and_requeue_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -423,7 +444,9 @@ subscribe_and_requeue_nack(Config) ->
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}),
wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
end
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
subscribe_and_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -441,7 +464,9 @@ subscribe_and_nack(Config) ->
multiple = false,
requeue = false}),
wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
subscribe_and_multiple_nack(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -461,7 +486,9 @@ subscribe_and_multiple_nack(Config) ->
multiple = true,
requeue = false}),
wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
%% TODO test with single active
basic_cancel(Config) ->
@@ -472,11 +499,12 @@ basic_cancel(Config) ->
publish(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
- subscribe(Ch, QName, false),
+ CTag = atom_to_binary(?FUNCTION_NAME, utf8),
+ subscribe(Ch, QName, false, CTag),
receive
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
- amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}),
+ amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}),
Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
?assertEqual([], lists:filter(fun(Props) ->
@@ -489,7 +517,9 @@ basic_cancel(Config) ->
wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]])
after 5000 ->
exit(basic_deliver_timeout)
- end.
+ end,
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
purge(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -501,7 +531,9 @@ purge(Config) ->
[_] = consume(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]),
{'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QName}),
- wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]).
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
basic_recover(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -513,7 +545,9 @@ basic_recover(Config) ->
[_] = consume(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
amqp_channel:cast(Ch, #'basic.recover'{requeue = true}),
- wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]).
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
delete_immediately_by_pid_fails(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -531,7 +565,9 @@ delete_immediately_by_pid_fails(Config) ->
durable = Durable,
passive = true,
auto_delete = false,
- arguments = Args})).
+ arguments = Args})),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
delete_immediately_by_pid_succeeds(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -550,7 +586,9 @@ delete_immediately_by_pid_succeeds(Config) ->
durable = Durable,
passive = true,
auto_delete = false,
- arguments = Args})).
+ arguments = Args})),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
delete_immediately_by_resource(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
@@ -568,7 +606,9 @@ delete_immediately_by_resource(Config) ->
durable = Durable,
passive = true,
auto_delete = false,
- arguments = Args})).
+ arguments = Args})),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ ok.
%%%%%%%%%%%%%%%%%%%%%%%%
%% Test helpers
@@ -600,12 +640,15 @@ consume_empty(Ch, QName) ->
amqp_channel:call(Ch, #'basic.get'{queue = QName})).
subscribe(Ch, Queue, NoAck) ->
+ subscribe(Ch, Queue, NoAck, <<"ctag">>).
+
+subscribe(Ch, Queue, NoAck, Ctag) ->
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
no_ack = NoAck,
- consumer_tag = <<"ctag">>},
+ consumer_tag = Ctag},
self()),
receive
- #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ #'basic.consume_ok'{consumer_tag = Ctag} ->
ok
end.
@@ -614,3 +657,11 @@ receive_basic_deliver(Redelivered) ->
{#'basic.deliver'{redelivered = R}, _} when R == Redelivered ->
ok
end.
+
+flush(T) ->
+ receive X ->
+ ct:pal("flushed ~w", [X]),
+ flush(T)
+ after T ->
+ ok
+ end.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 61f9328855..c23b7ac85e 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -172,7 +172,7 @@ init_per_group(Group, Config) ->
ok ->
ok = rabbit_ct_broker_helpers:rpc(
Config2, 0, application, set_env,
- [rabbit, channel_queue_cleanup_interval, 100]),
+ [rabbit, channel_tick_interval, 100]),
%% HACK: the larger cluster sizes benefit for a bit more time
%% after clustering before running the tests.
case Group of
diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl
index ea6e973ca2..c44b799caa 100644
--- a/test/rabbit_core_metrics_gc_SUITE.erl
+++ b/test/rabbit_core_metrics_gc_SUITE.erl
@@ -177,6 +177,8 @@ channel_metrics(Config) ->
amqp_channel:call(Ch, #'queue.declare'{queue = <<"queue_metrics">>}),
amqp_channel:cast(Ch, #'basic.publish'{routing_key = <<"queue_metrics">>},
#amqp_msg{payload = <<"hello">>}),
+ amqp_channel:cast(Ch, #'basic.publish'{routing_key = <<"won't route $ยข% anywhere">>},
+ #amqp_msg{payload = <<"hello">>}),
{#'basic.get_ok'{}, _} = amqp_channel:call(Ch, #'basic.get'{queue = <<"queue_metrics">>,
no_ack=true}),
timer:sleep(150),
diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl
index 866d529489..ed64fcf1c5 100644
--- a/test/unit_inbroker_non_parallel_SUITE.erl
+++ b/test/unit_inbroker_non_parallel_SUITE.erl
@@ -39,7 +39,8 @@ groups() ->
file_handle_cache, %% Change FHC limit.
head_message_timestamp_statistics, %% Expect specific statistics.
log_management, %% Check log files.
- log_management_during_startup, %% Check log files.
+ log_file_initialised_during_startup,
+ log_file_fails_to_initialise_during_startup,
externally_rotated_logs_are_automatically_reopened %% Check log files.
]}
].
@@ -271,11 +272,11 @@ log_management1(_Config) ->
ok = test_logs_working([LogFile]),
passed.
-log_management_during_startup(Config) ->
+log_file_initialised_during_startup(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
- ?MODULE, log_management_during_startup1, [Config]).
+ ?MODULE, log_file_initialised_during_startup1, [Config]).
-log_management_during_startup1(_Config) ->
+log_file_initialised_during_startup1(_Config) ->
[LogFile|_] = rabbit:log_locations(),
Suffix = ".0",
@@ -299,57 +300,72 @@ log_management_during_startup1(_Config) ->
application:unset_env(lager, extra_sinks),
ok = rabbit:start(),
- %% start application with logging to directory with no
- %% write permissions
- ok = rabbit:stop(),
- NoPermission1 = "/var/empty/test.log",
- delete_file(NoPermission1),
- delete_file(filename:dirname(NoPermission1)),
- ok = rabbit:stop(),
- ok = application:set_env(rabbit, lager_default_file, NoPermission1),
+ %% clean up
+ ok = application:set_env(rabbit, lager_default_file, LogFile),
application:unset_env(rabbit, log),
application:unset_env(lager, handlers),
application:unset_env(lager, extra_sinks),
- ok = try rabbit:start() of
+ ok = rabbit:start(),
+ passed.
+
+
+log_file_fails_to_initialise_during_startup(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, log_file_fails_to_initialise_during_startup1, [Config]).
+
+log_file_fails_to_initialise_during_startup1(_Config) ->
+ [LogFile|_] = rabbit:log_locations(),
+ Suffix = ".0",
+
+ %% start application with logging to directory with no
+ %% write permissions
+ ok = rabbit:stop(),
+
+ Run1 = fun() ->
+ NoPermission1 = "/var/empty/test.log",
+ delete_file(NoPermission1),
+ delete_file(filename:dirname(NoPermission1)),
+ ok = rabbit:stop(),
+ ok = application:set_env(rabbit, lager_default_file, NoPermission1),
+ application:unset_env(rabbit, log),
+ application:unset_env(lager, handlers),
+ application:unset_env(lager, extra_sinks),
+ rabbit:start()
+ end,
+
+ ok = try Run1() of
ok -> exit({got_success_but_expected_failure,
log_rotation_no_write_permission_dir_test})
catch
- _:{error, {cannot_log_to_file, _, Reason1}}
- when Reason1 =:= enoent orelse Reason1 =:= eacces -> ok;
- _:{error, {cannot_log_to_file, _,
- {cannot_create_parent_dirs, _, Reason1}}}
- when Reason1 =:= eperm orelse
- Reason1 =:= eacces orelse
- Reason1 =:= enoent-> ok
+ _:could_not_initialise_logger -> ok
end,
%% start application with logging to a subdirectory which
%% parent directory has no write permissions
NoPermission2 = "/var/empty/non-existent/test.log",
- delete_file(NoPermission2),
- delete_file(filename:dirname(NoPermission2)),
- case rabbit:stop() of
- ok -> ok;
- {error, lager_not_running} -> ok
+
+ Run2 = fun() ->
+ delete_file(NoPermission2),
+ delete_file(filename:dirname(NoPermission2)),
+ case rabbit:stop() of
+ ok -> ok;
+ {error, lager_not_running} -> ok
+ end,
+ ok = application:set_env(rabbit, lager_default_file, NoPermission2),
+ application:unset_env(rabbit, log),
+ application:unset_env(lager, handlers),
+ application:unset_env(lager, extra_sinks),
+ rabbit:start()
end,
- ok = application:set_env(rabbit, lager_default_file, NoPermission2),
- application:unset_env(rabbit, log),
- application:unset_env(lager, handlers),
- application:unset_env(lager, extra_sinks),
- ok = try rabbit:start() of
+
+ ok = try Run2() of
ok -> exit({got_success_but_expected_failure,
log_rotation_parent_dirs_test})
catch
- _:{error, {cannot_log_to_file, _, Reason2}}
- when Reason2 =:= enoent orelse Reason2 =:= eacces -> ok;
- _:{error, {cannot_log_to_file, _,
- {cannot_create_parent_dirs, _, Reason2}}}
- when Reason2 =:= eperm orelse
- Reason2 =:= eacces orelse
- Reason2 =:= enoent-> ok
+ _:could_not_initialise_logger -> ok
end,
- %% cleanup
+ %% clean up
ok = application:set_env(rabbit, lager_default_file, LogFile),
application:unset_env(rabbit, log),
application:unset_env(lager, handlers),
@@ -494,7 +510,7 @@ channel_statistics1(_Config) ->
[{{Ch, QRes}, 1, 0, 0, 0, 0, 0, 0, 0}] = ets:lookup(
channel_queue_metrics,
{Ch, QRes}),
- [{{Ch, X}, 1, 0, 0, 0}] = ets:lookup(
+ [{{Ch, X}, 1, 0, 0, 0, 0}] = ets:lookup(
channel_exchange_metrics,
{Ch, X}),
[{{Ch, {QRes, X}}, 1, 0}] = ets:lookup(
@@ -509,7 +525,7 @@ channel_statistics1(_Config) ->
[{{Ch, QRes}, 1, 0, 0, 0, 0, 0, 0, 1}] = ets:lookup(
channel_queue_metrics,
{Ch, QRes}),
- [{{Ch, X}, 1, 0, 0, 0}] = ets:lookup(
+ [{{Ch, X}, 1, 0, 0, 0, 0}] = ets:lookup(
channel_exchange_metrics,
{Ch, X}),
[{{Ch, {QRes, X}}, 1, 1}] = ets:lookup(
@@ -522,7 +538,7 @@ channel_statistics1(_Config) ->
force_metric_gc(),
Check4 = fun() ->
[] = ets:lookup(channel_queue_metrics, {Ch, QRes}),
- [{{Ch, X}, 1, 0, 0, 0}] = ets:lookup(
+ [{{Ch, X}, 1, 0, 0, 0, 0}] = ets:lookup(
channel_exchange_metrics,
{Ch, X}),
[] = ets:lookup(channel_queue_exchange_metrics,