diff options
author | Michael Klishin <michael@clojurewerkz.org> | 2019-04-25 21:10:52 +0300 |
---|---|---|
committer | Michael Klishin <michael@clojurewerkz.org> | 2019-04-25 21:10:52 +0300 |
commit | 99d0adebb332fa45e9507e7b8d5a73bd4ab34338 (patch) | |
tree | bf4e41bbf82592e5114901362a667d3fe7166894 | |
parent | d279ccfee32764ad76517fb101015fa7ec77d9f5 (diff) | |
parent | e237612ae74990aff612a79b617d1dad438aaf51 (diff) | |
download | rabbitmq-server-git-99d0adebb332fa45e9507e7b8d5a73bd4ab34338.tar.gz |
Merge branch 'master' into rabbitmq-erlang-client-91
-rw-r--r-- | Makefile | 3 | ||||
-rw-r--r-- | src/rabbit.erl | 33 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 673 | ||||
-rw-r--r-- | src/rabbit_core_metrics_gc.erl | 2 | ||||
-rw-r--r-- | src/rabbit_ssl.erl | 59 | ||||
-rw-r--r-- | src/rabbit_vm.erl | 4 | ||||
-rw-r--r-- | test/consumer_timeout_SUITE.erl | 272 | ||||
-rw-r--r-- | test/queue_parallel_SUITE.erl | 115 | ||||
-rw-r--r-- | test/quorum_queue_SUITE.erl | 2 | ||||
-rw-r--r-- | test/rabbit_core_metrics_gc_SUITE.erl | 2 | ||||
-rw-r--r-- | test/unit_inbroker_non_parallel_SUITE.erl | 98 |
12 files changed, 907 insertions, 360 deletions
@@ -126,7 +126,8 @@ define PROJECT_ENV {vhost_restart_strategy, continue}, %% {global, prefetch count} {default_consumer_prefetch, {false, 0}}, - {channel_queue_cleanup_interval, 60000}, + %% interval at which the channel can perform periodic actions + {channel_tick_interval, 60000}, %% Default max message size is 128 MB {max_message_size, 134217728} ] diff --git a/src/rabbit.erl b/src/rabbit.erl index 09b0494ce1..4a974fd682 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -320,13 +320,11 @@ ensure_config() -> case rabbit_config:validate_config_files() of ok -> ok; {error, {ErrFmt, ErrArgs}} -> - log_boot_error_and_exit(check_config_file, ErrFmt, ErrArgs) + throw({error, {check_config_file, ErrFmt, ErrArgs}}) end, case rabbit_config:prepare_and_use_config() of {error, {generation_error, Error}} -> - log_boot_error_and_exit(generate_config_file, - "~nConfig file generation failed ~s", - Error); + throw({error, {generate_config_file, Error}}); ok -> ok end. @@ -463,9 +461,8 @@ start_it(StartFun) -> true -> ok; false -> StartFun() end - catch - Class:Reason -> - boot_error(Class, Reason) + catch Class:Reason -> + boot_error(Class, Reason) after unlink(Marker), Marker ! stop, @@ -1018,11 +1015,23 @@ boot_error(_, {could_not_start, rabbit, {{timeout_waiting_for_tables, _}, _}}) - log_boot_error_and_exit( timeout_waiting_for_tables, "~n" ++ Err ++ rabbit_nodes:diagnostics(Nodes), []); -boot_error(Class, {error, {cannot_log_to_file, _, _}} = Reason) -> - log_boot_error_and_exit( - Reason, - "~nError description:~s", - [lager:pr_stacktrace(erlang:get_stacktrace(), {Class, Reason})]); +boot_error(_, {error, {cannot_log_to_file, unknown, Reason}}) -> + log_boot_error_and_exit(could_not_initialise_logger, + "failed to initialised logger: ~p~n", + [Reason]); +boot_error(_, {error, {cannot_log_to_file, LogFile, + {cannot_create_parent_dirs, _, Reason}}}) -> + log_boot_error_and_exit(could_not_initialise_logger, + "failed to create parent directory for log file at '~s', reason: ~p~n", + [LogFile, Reason]); +boot_error(_, {error, {cannot_log_to_file, LogFile, Reason}}) -> + log_boot_error_and_exit(could_not_initialise_logger, + "failed to open log file at '~s', reason: ~p~n", + [LogFile, Reason]); +boot_error(_, {error, {generate_config_file, Error}}) -> + log_boot_error_and_exit(generate_config_file, + "~nConfig file generation failed:~n~s~n", + [Error]); boot_error(Class, Reason) -> LogLocations = log_locations(), log_boot_error_and_exit( diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 167e6c96a6..9e94dd8f27 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -74,6 +74,7 @@ -export_type([name/0, qmsg/0, absent_reason/0]). -type name() :: rabbit_types:r('queue'). + -type qpids() :: [pid()]. -type qlen() :: rabbit_types:ok(non_neg_integer()). -type qfun(A) :: fun ((amqqueue:amqqueue()) -> A | no_return()). @@ -1228,12 +1229,11 @@ purge(Q) when ?amqqueue_is_quorum(Q) -> NodeId = amqqueue:get_pid(Q), rabbit_quorum_queue:purge(NodeId). --spec requeue(pid(), +-spec requeue(pid() | amqqueue:ra_server_id(), {rabbit_fifo:consumer_tag(), [msg_id()]}, pid(), quorum_states()) -> 'ok'. - requeue(QPid, {_, MsgIds}, ChPid, QuorumStates) when ?IS_CLASSIC(QPid) -> ok = delegate:invoke(QPid, {gen_server2, call, [{requeue, MsgIds, ChPid}, infinity]}), QuorumStates; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 59a8cce371..69add85057 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -66,8 +66,9 @@ -export([source/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1, prioritise_call/4, - prioritise_cast/3, prioritise_info/3, format_message_queue/2]). + handle_info/2, handle_pre_hibernate/1, handle_post_hibernate/1, + prioritise_call/4, prioritise_cast/3, prioritise_info/3, + format_message_queue/2]). -deprecated([{force_event_refresh, 1, eventually}]). @@ -81,91 +82,96 @@ %% Mgmt HTTP API refactor -export([handle_method/6]). --record(ch, { - %% starting | running | flow | closing - state, - %% same as reader's protocol. Used when instantiating - %% (protocol) exceptions. - protocol, - %% channel number - channel, - %% reader process - reader_pid, - %% writer process - writer_pid, - %% - conn_pid, - %% same as reader's name, see #v1.name - %% in rabbit_reader - conn_name, - %% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined - %% or any other channel creating/spawning entity - source, - %% limiter pid, see rabbit_limiter - limiter, - %% none | {Msgs, Acks} | committing | failed | - tx, - %% (consumer) delivery tag sequence - next_tag, - %% messages pending consumer acknowledgement - unacked_message_q, - %% same as #v1.user in the reader, used in - %% authorisation checks - user, - %% same as #v1.user in the reader - virtual_host, - %% when queue.bind's queue field is empty, - %% this name will be used instead - most_recently_declared_queue, - %% a map of queue ref to queue name - queue_names, - %% queue processes are monitored to update - %% queue names - queue_monitors, - %% a map of consumer tags to - %% consumer details: #amqqueue record, acknowledgement mode, - %% consumer exclusivity, etc - consumer_mapping, - %% a map of queue pids to consumer tag lists - queue_consumers, - %% a set of pids of queues that have unacknowledged - %% deliveries - delivering_queues, - %% when a queue is declared as exclusive, queue - %% collector must be notified. - %% see rabbit_queue_collector for more info. - queue_collector_pid, - %% timer used to emit statistics - stats_timer, - %% are publisher confirms enabled for this channel? - confirm_enabled, - %% publisher confirm delivery tag sequence - publish_seqno, - %% a dtree used to track unconfirmed - %% (to publishers) messages - unconfirmed, - %% a list of tags for published messages that were - %% delivered but are yet to be confirmed to the client - confirmed, - %% a list of tags for published messages that were - %% rejected but are yet to be sent to the client - rejected, - %% same as capabilities in the reader - capabilities, - %% tracing exchange resource if tracing is enabled, - %% 'none' otherwise - trace_state, - consumer_prefetch, - %% used by "one shot RPC" (amq. - reply_consumer, - %% flow | noflow, see rabbitmq-server#114 - delivery_flow, - interceptor_state, - queue_states, - queue_cleanup_timer, - %% Message content size limit - max_message_size -}). +-record(conf, { + %% starting | running | flow | closing + state, + %% same as reader's protocol. Used when instantiating + %% (protocol) exceptions. + protocol, + %% channel number + channel, + %% reader process + reader_pid, + %% writer process + writer_pid, + %% + conn_pid, + %% same as reader's name, see #v1.name + %% in rabbit_reader + conn_name, + %% channel's originating source e.g. rabbit_reader | rabbit_direct | undefined + %% or any other channel creating/spawning entity + source, + %% same as #v1.user in the reader, used in + %% authorisation checks + user, + %% same as #v1.user in the reader + virtual_host, + %% when queue.bind's queue field is empty, + %% this name will be used instead + most_recently_declared_queue, + %% when a queue is declared as exclusive, queue + %% collector must be notified. + %% see rabbit_queue_collector for more info. + queue_collector_pid, + + %% same as capabilities in the reader + capabilities, + %% tracing exchange resource if tracing is enabled, + %% 'none' otherwise + trace_state, + consumer_prefetch, + %% Message content size limit + max_message_size, + consumer_timeout + }). + +-record(ch, {cfg :: #conf{}, + %% limiter state, see rabbit_limiter + limiter, + %% none | {Msgs, Acks} | committing | failed | + tx, + %% (consumer) delivery tag sequence + next_tag, + %% messages pending consumer acknowledgement + unacked_message_q, + %% a map of queue ref to queue name + queue_names, + %% queue processes are monitored to update + %% queue names + queue_monitors, + %% a map of consumer tags to + %% consumer details: #amqqueue record, acknowledgement mode, + %% consumer exclusivity, etc + consumer_mapping, + %% a map of queue pids to consumer tag lists + queue_consumers, + %% a set of pids of queues that have unacknowledged + %% deliveries + delivering_queues, + %% timer used to emit statistics + stats_timer, + %% are publisher confirms enabled for this channel? + confirm_enabled, + %% publisher confirm delivery tag sequence + publish_seqno, + %% a dtree used to track unconfirmed + %% (to publishers) messages + unconfirmed, + %% a list of tags for published messages that were + %% delivered but are yet to be confirmed to the client + confirmed, + %% a list of tags for published messages that were + %% rejected but are yet to be sent to the client + rejected, + %% used by "one shot RPC" (amq. + reply_consumer, + %% flow | noflow, see rabbitmq-server#114 + delivery_flow, + interceptor_state, + queue_states, + tick_timer + }). -define(QUEUE, lqueue). @@ -483,39 +489,43 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Limiter0 end, MaxMessageSize = get_max_message_size(), - State = #ch{state = starting, - protocol = Protocol, - channel = Channel, - reader_pid = ReaderPid, - writer_pid = WriterPid, - conn_pid = ConnPid, - conn_name = ConnName, - limiter = Limiter, + ConsumerTimeout = get_consumer_timeout(), + State = #ch{cfg = #conf{state = starting, + protocol = Protocol, + channel = Channel, + reader_pid = ReaderPid, + writer_pid = WriterPid, + conn_pid = ConnPid, + conn_name = ConnName, + user = User, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + queue_collector_pid = CollectorPid, + capabilities = Capabilities, + trace_state = rabbit_trace:init(VHost), + consumer_prefetch = Prefetch, + max_message_size = MaxMessageSize, + consumer_timeout = ConsumerTimeout + }, + limiter = Limiter, tx = none, next_tag = 1, unacked_message_q = ?QUEUE:new(), - user = User, - virtual_host = VHost, - most_recently_declared_queue = <<>>, queue_names = #{}, queue_monitors = pmon:new(), consumer_mapping = #{}, queue_consumers = #{}, delivering_queues = sets:new(), - queue_collector_pid = CollectorPid, confirm_enabled = false, publish_seqno = 1, unconfirmed = dtree:empty(), rejected = [], confirmed = [], - capabilities = Capabilities, - trace_state = rabbit_trace:init(VHost), - consumer_prefetch = Prefetch, reply_consumer = none, delivery_flow = Flow, interceptor_state = undefined, - queue_states = #{}, - max_message_size = MaxMessageSize}, + queue_states = #{} + }, State1 = State#ch{ interceptor_state = rabbit_channel_interceptor:init(State)}, State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer), @@ -525,7 +535,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, rabbit_event:if_enabled(State2, #ch.stats_timer, fun() -> emit_stats(State2) end), put_operation_timeout(), - State3 = init_queue_cleanup_timer(State2), + State3 = init_tick_timer(State2), {ok, State3, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -568,8 +578,9 @@ handle_call({{info, Items}, Deadline}, _From, State) -> reply({error, Error}, State) end; -handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) -> - reply(ok, State#ch{trace_state = rabbit_trace:init(VHost)}); +handle_call(refresh_config, _From, + State = #ch{cfg = #conf{virtual_host = VHost} = Cfg}) -> + reply(ok, State#ch{cfg = Cfg#conf{trace_state = rabbit_trace:init(VHost)}}); handle_call(refresh_interceptors, _From, State) -> IState = rabbit_channel_interceptor:init(State), @@ -590,7 +601,7 @@ handle_call(_Request, _From, State) -> noreply(State). handle_cast({method, Method, Content, Flow}, - State = #ch{reader_pid = Reader, + State = #ch{cfg = #conf{reader_pid = Reader}, interceptor_state = IState}) -> case Flow of %% We are going to process a message from the rabbit_reader @@ -617,12 +628,13 @@ handle_cast({method, Method, Content, Flow}, {stop, {Reason, erlang:get_stacktrace()}, State} end; -handle_cast(ready_for_close, State = #ch{state = closing, - writer_pid = WriterPid}) -> +handle_cast(ready_for_close, + State = #ch{cfg = #conf{state = closing, + writer_pid = WriterPid}}) -> ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), {stop, normal, State}; -handle_cast(terminate, State = #ch{writer_pid = WriterPid}) -> +handle_cast(terminate, State = #ch{cfg = #conf{writer_pid = WriterPid}}) -> ok = rabbit_writer:flush(WriterPid), {stop, normal, State}; @@ -634,12 +646,14 @@ handle_cast({command, Msg}, State) -> ok = send(Msg, State), noreply(State); -handle_cast({deliver, _CTag, _AckReq, _Msg}, State = #ch{state = closing}) -> +handle_cast({deliver, _CTag, _AckReq, _Msg}, + State = #ch{cfg = #conf{state = closing}}) -> noreply(State); handle_cast({deliver, ConsumerTag, AckRequired, Msg}, State) -> noreply(handle_deliver(ConsumerTag, AckRequired, Msg, State)); -handle_cast({deliver_reply, _K, _Del}, State = #ch{state = closing}) -> +handle_cast({deliver_reply, _K, _Del}, + State = #ch{cfg = #conf{state = closing}}) -> noreply(State); handle_cast({deliver_reply, _K, _Del}, State = #ch{reply_consumer = none}) -> noreply(State); @@ -647,8 +661,8 @@ handle_cast({deliver_reply, Key, #delivery{message = #basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}}}, - State = #ch{writer_pid = WriterPid, - next_tag = DeliveryTag, + State = #ch{cfg = #conf{writer_pid = WriterPid}, + next_tag = DeliveryTag, reply_consumer = {ConsumerTag, _Suffix, Key}}) -> ok = rabbit_writer:send_command( WriterPid, @@ -662,12 +676,14 @@ handle_cast({deliver_reply, Key, #delivery{message = handle_cast({deliver_reply, _K1, _}, State=#ch{reply_consumer = {_, _, _K2}}) -> noreply(State); -handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) -> +handle_cast({send_credit_reply, Len}, + State = #ch{cfg = #conf{writer_pid = WriterPid}}) -> ok = rabbit_writer:send_command( WriterPid, #'basic.credit_ok'{available = Len}), noreply(State); -handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) -> +handle_cast({send_drained, CTagCredit}, + State = #ch{cfg = #conf{writer_pid = WriterPid}}) -> [ok = rabbit_writer:send_command( WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag, credit_drained = CreditDrained}) @@ -734,7 +750,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, {internal, MsgSeqNos, Actions, QState1} -> State = State0#ch{queue_states = maps:put(Name, QState1, QueueStates)}, %% execute actions - WriterPid = State#ch.writer_pid, + WriterPid = State#ch.cfg#conf.writer_pid, lists:foreach(fun ({send_credit_reply, Avail}) -> ok = rabbit_writer:send_command( WriterPid, @@ -805,34 +821,45 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; -handle_info({{Ref, Node}, LateAnswer}, State = #ch{channel = Channel}) +handle_info({{Ref, Node}, LateAnswer}, + State = #ch{cfg = #conf{channel = Channel}}) when is_reference(Ref) -> rabbit_log_channel:warning("Channel ~p ignoring late answer ~p from ~p", [Channel, LateAnswer, Node]), noreply(State); -handle_info(queue_cleanup, State = #ch{queue_states = QueueStates0}) -> - QueueStates = +handle_info(tick, State0 = #ch{queue_states = QueueStates0}) -> + QueueStates1 = maps:filter(fun(_, QS) -> - QName = rabbit_quorum_queue:queue_name(QS), - [] /= rabbit_amqqueue:lookup(QName) + QName = rabbit_quorum_queue:queue_name(QS), + [] /= rabbit_amqqueue:lookup([QName]) end, QueueStates0), - noreply(init_queue_cleanup_timer(State#ch{queue_states = QueueStates})); - -handle_info({channel_source, Source}, State = #ch{}) -> - noreply(State#ch{source = Source}). + case evaluate_consumer_timeout(State0#ch{queue_states = QueueStates1}) of + {noreply, State} -> + noreply(init_tick_timer(reset_tick_timer(State))); + Return -> + Return + end; +handle_info({channel_source, Source}, State = #ch{cfg = Cfg}) -> + noreply(State#ch{cfg = Cfg#conf{source = Source}}). -handle_pre_hibernate(State) -> +handle_pre_hibernate(State0) -> ok = clear_permission_cache(), + State = maybe_cancel_tick_timer(State0), rabbit_event:if_enabled( State, #ch.stats_timer, fun () -> emit_stats(State, [{idle_since, os:system_time(milli_seconds)}]) - end), + end), {hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}. -terminate(_Reason, State = #ch{user = #user{username = Username}}) -> +handle_post_hibernate(State0) -> + State = init_tick_timer(State0), + {noreply, State}. + +terminate(_Reason, + State = #ch{cfg = #conf{user = #user{username = Username}}}) -> {_Res, _State1} = notify_queues(State), pg_local:leave(rabbit_channels, self()), rabbit_event:if_enabled(State, #ch.stats_timer, @@ -857,6 +884,13 @@ get_max_message_size() -> ?MAX_MSG_SIZE end. +get_consumer_timeout() -> + case application:get_env(rabbit, consumer_timeout) of + {ok, MS} when is_integer(MS) -> + MS; + _ -> + undefined + end. %%--------------------------------------------------------------------------- reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}. @@ -878,22 +912,23 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. -send(_Command, #ch{state = closing}) -> +send(_Command, #ch{cfg = #conf{state = closing}}) -> ok; -send(Command, #ch{writer_pid = WriterPid}) -> +send(Command, #ch{cfg = #conf{writer_pid = WriterPid}}) -> ok = rabbit_writer:send_command(WriterPid, Command). format_soft_error(#amqp_error{name = N, explanation = E, method = M}) -> io_lib:format("operation ~s caused a channel exception ~s: ~ts", [M, N, E]). -handle_exception(Reason, State = #ch{protocol = Protocol, - channel = Channel, - writer_pid = WriterPid, - reader_pid = ReaderPid, - conn_pid = ConnPid, - conn_name = ConnName, - virtual_host = VHost, - user = User}) -> +handle_exception(Reason, State = #ch{cfg = #conf{protocol = Protocol, + channel = Channel, + writer_pid = WriterPid, + reader_pid = ReaderPid, + conn_pid = ConnPid, + conn_name = ConnName, + virtual_host = VHost, + user = User + }}) -> %% something bad's happened: notify_queues may not be 'ok' {_Result, State1} = notify_queues(State), case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of @@ -920,11 +955,12 @@ precondition_failed(Format, Params) -> rabbit_misc:protocol_error(precondition_failed, Format, Params). return_queue_declare_ok(#resource{name = ActualName}, - NoWait, MessageCount, ConsumerCount, State) -> - return_ok(State#ch{most_recently_declared_queue = ActualName}, NoWait, - #'queue.declare_ok'{queue = ActualName, - message_count = MessageCount, - consumer_count = ConsumerCount}). + NoWait, MessageCount, ConsumerCount, + #ch{cfg = Cfg} = State) -> + return_ok(State#ch{cfg = Cfg#conf{most_recently_declared_queue = ActualName}}, + NoWait, #'queue.declare_ok'{queue = ActualName, + message_count = MessageCount, + consumer_count = ConsumerCount}). check_resource_access(User, Resource, Perm) -> V = {Resource, Perm}, @@ -962,15 +998,15 @@ check_read_permitted_on_topic(Resource, User, ConnPid, RoutingKey, ChSrc) -> check_user_id_header(#'P_basic'{user_id = undefined}, _) -> ok; check_user_id_header(#'P_basic'{user_id = Username}, - #ch{user = #user{username = Username}}) -> + #ch{cfg = #conf{user = #user{username = Username}}}) -> ok; check_user_id_header( - #'P_basic'{}, #ch{user = #user{authz_backends = - [{rabbit_auth_backend_dummy, _}]}}) -> + #'P_basic'{}, #ch{cfg = #conf{user = #user{authz_backends = + [{rabbit_auth_backend_dummy, _}]}}}) -> ok; check_user_id_header(#'P_basic'{user_id = Claimed}, - #ch{user = #user{username = Actual, - tags = Tags}}) -> + #ch{cfg = #conf{user = #user{username = Actual, + tags = Tags}}}) -> case lists:member(impersonator, Tags) of true -> ok; false -> precondition_failed( @@ -1079,18 +1115,18 @@ qbin_to_resource(QueueNameBin, VHostPath) -> name_to_resource(Type, NameBin, VHostPath) -> rabbit_misc:r(VHostPath, Type, NameBin). -expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> +expand_queue_name_shortcut(<<>>, #ch{cfg = #conf{most_recently_declared_queue = <<>>}}) -> rabbit_misc:protocol_error(not_found, "no previously declared queue", []); -expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) -> +expand_queue_name_shortcut(<<>>, #ch{cfg = #conf{most_recently_declared_queue = MRDQ}}) -> MRDQ; expand_queue_name_shortcut(QueueNameBin, _) -> QueueNameBin. expand_routing_key_shortcut(<<>>, <<>>, - #ch{most_recently_declared_queue = <<>>}) -> + #ch{cfg = #conf{most_recently_declared_queue = <<>>}}) -> rabbit_misc:protocol_error(not_found, "no previously declared queue", []); expand_routing_key_shortcut(<<>>, <<>>, - #ch{most_recently_declared_queue = MRDQ}) -> + #ch{cfg = #conf{most_recently_declared_queue = MRDQ}}) -> MRDQ; expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) -> RoutingKey. @@ -1178,9 +1214,10 @@ record_confirms(MXs, State = #ch{confirmed = C}) -> handle_method({Method, Content}, State) -> handle_method(Method, Content, State). -handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> +handle_method(#'channel.open'{}, _, + State = #ch{cfg = #conf{state = starting} = Cfg}) -> %% Don't leave "starting" as the state for 5s. TODO is this TRTTD? - State1 = State#ch{state = running}, + State1 = State#ch{cfg = Cfg#conf{state = running}}, rabbit_event:if_enabled(State1, #ch.stats_timer, fun() -> emit_stats(State1) end), {reply, #'channel.open_ok'{}, State1}; @@ -1189,21 +1226,23 @@ handle_method(#'channel.open'{}, _, _State) -> rabbit_misc:protocol_error( channel_error, "second 'channel.open' seen", []); -handle_method(_Method, _, #ch{state = starting}) -> +handle_method(_Method, _, #ch{cfg = #conf{state = starting}}) -> rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []); -handle_method(#'channel.close_ok'{}, _, #ch{state = closing}) -> +handle_method(#'channel.close_ok'{}, _, #ch{cfg = #conf{state = closing}}) -> stop; -handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid, - state = closing}) -> +handle_method(#'channel.close'{}, _, + State = #ch{cfg = #conf{state = closing, + writer_pid = WriterPid}}) -> ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), {noreply, State}; -handle_method(_Method, _, State = #ch{state = closing}) -> +handle_method(_Method, _, State = #ch{cfg = #conf{state = closing}}) -> {noreply, State}; -handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> +handle_method(#'channel.close'{}, _, + State = #ch{cfg = #conf{reader_pid = ReaderPid}}) -> {_Result, State1} = notify_queues(State), %% We issue the channel.close_ok response after a handshake with %% the reader, the other half of which is ready_for_close. That @@ -1235,17 +1274,19 @@ handle_method(#'basic.publish'{immediate = true}, _Content, _State) -> handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory}, - Content, State = #ch{virtual_host = VHostPath, + Content, State = #ch{cfg = #conf{channel = ChannelNum, + conn_pid = ConnPid, + source = ChSrc, + conn_name = ConnName, + virtual_host = VHostPath, + user = #user{username = Username} = User, + trace_state = TraceState, + max_message_size = MaxMessageSize + }, tx = Tx, - channel = ChannelNum, confirm_enabled = ConfirmEnabled, - trace_state = TraceState, - user = #user{username = Username} = User, - conn_name = ConnName, - delivery_flow = Flow, - conn_pid = ConnPid, - source = ChSrc, - max_message_size = MaxMessageSize}) -> + delivery_flow = Flow + }) -> check_msg_size(Content, MaxMessageSize), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, User), @@ -1300,12 +1341,13 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, end}; handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{writer_pid = WriterPid, - conn_pid = ConnPid, - limiter = Limiter, + _, State = #ch{cfg = #conf{writer_pid = WriterPid, + conn_pid = ConnPid, + user = User, + virtual_host = VHostPath + }, + limiter = Limiter, next_tag = DeliveryTag, - user = User, - virtual_host = VHostPath, queue_states = QueueStates0}) -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), check_read_permitted(QueueName, User), @@ -1385,10 +1427,11 @@ handle_method(#'basic.consume'{queue = QueueNameBin, exclusive = ExclusiveConsume, nowait = NoWait, arguments = Args}, - _, State = #ch{consumer_prefetch = ConsumerPrefetch, - consumer_mapping = ConsumerMapping, - user = User, - virtual_host = VHostPath}) -> + _, State = #ch{cfg = #conf{consumer_prefetch = ConsumerPrefetch, + user = User, + virtual_host = VHostPath}, + consumer_mapping = ConsumerMapping + }) -> case maps:find(ConsumerTag, ConsumerMapping) of error -> QueueName = qbin_to_resource(QueueNameBin, VHostPath), @@ -1420,9 +1463,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin, end; handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, - _, State = #ch{consumer_mapping = ConsumerMapping, + _, State = #ch{cfg = #conf{user = #user{username = Username}}, + consumer_mapping = ConsumerMapping, queue_consumers = QCons, - user = #user{username = Username}, queue_states = QueueStates0}) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, case maps:find(ConsumerTag, ConsumerMapping) of @@ -1470,10 +1513,11 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> handle_method(#'basic.qos'{global = false, prefetch_count = PrefetchCount}, - _, State = #ch{limiter = Limiter}) -> + _, State = #ch{cfg = Cfg, + limiter = Limiter}) -> %% Ensures that if default was set, it's overridden Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), - {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount, + {reply, #'basic.qos_ok'{}, State#ch{cfg = Cfg#conf{consumer_prefetch = PrefetchCount}, limiter = Limiter1}}; handle_method(#'basic.qos'{global = true, @@ -1531,87 +1575,87 @@ handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, reject(DeliveryTag, Requeue, false, State); handle_method(#'exchange.declare'{nowait = NoWait} = Method, - _, State = #ch{virtual_host = VHostPath, - user = User, - queue_collector_pid = CollectorPid, - conn_pid = ConnPid, - source = ChSrc}) -> + _, State = #ch{cfg = #conf{virtual_host = VHostPath, + user = User, + queue_collector_pid = CollectorPid, + conn_pid = ConnPid, + source = ChSrc}}) -> handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.delete'{nowait = NoWait} = Method, - _, State = #ch{conn_pid = ConnPid, - source = ChSrc, - virtual_host = VHostPath, - queue_collector_pid = CollectorPid, - user = User}) -> + _, State = #ch{cfg = #conf{conn_pid = ConnPid, + source = ChSrc, + virtual_host = VHostPath, + queue_collector_pid = CollectorPid, + user = User}}) -> handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.delete_ok'{}); handle_method(#'exchange.bind'{nowait = NoWait} = Method, - _, State = #ch{virtual_host = VHostPath, - conn_pid = ConnPid, - source = ChSrc, - queue_collector_pid = CollectorPid, - user = User}) -> + _, State = #ch{cfg = #conf{virtual_host = VHostPath, + conn_pid = ConnPid, + source = ChSrc, + queue_collector_pid = CollectorPid, + user = User}}) -> handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.bind_ok'{}); handle_method(#'exchange.unbind'{nowait = NoWait} = Method, - _, State = #ch{virtual_host = VHostPath, - conn_pid = ConnPid, - source = ChSrc, - queue_collector_pid = CollectorPid, - user = User}) -> + _, State = #ch{cfg = #conf{virtual_host = VHostPath, + conn_pid = ConnPid, + source = ChSrc, + queue_collector_pid = CollectorPid, + user = User}}) -> handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'exchange.unbind_ok'{}); handle_method(#'queue.declare'{nowait = NoWait} = Method, - _, State = #ch{virtual_host = VHostPath, - conn_pid = ConnPid, - source = ChSrc, - queue_collector_pid = CollectorPid, - user = User}) -> + _, State = #ch{cfg = #conf{virtual_host = VHostPath, + conn_pid = ConnPid, + source = ChSrc, + queue_collector_pid = CollectorPid, + user = User}}) -> {ok, QueueName, MessageCount, ConsumerCount} = handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); handle_method(#'queue.delete'{nowait = NoWait} = Method, _, - State = #ch{conn_pid = ConnPid, - source = ChSrc, - virtual_host = VHostPath, - queue_collector_pid = CollectorPid, - user = User}) -> + State = #ch{cfg = #conf{conn_pid = ConnPid, + source = ChSrc, + virtual_host = VHostPath, + queue_collector_pid = CollectorPid, + user = User}}) -> {ok, PurgedMessageCount} = handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'queue.delete_ok'{message_count = PurgedMessageCount}); handle_method(#'queue.bind'{nowait = NoWait} = Method, _, - State = #ch{conn_pid = ConnPid, - source = ChSrc, - user = User, - queue_collector_pid = CollectorPid, - virtual_host = VHostPath}) -> + State = #ch{cfg = #conf{conn_pid = ConnPid, + source = ChSrc, + user = User, + queue_collector_pid = CollectorPid, + virtual_host = VHostPath}}) -> handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, NoWait, #'queue.bind_ok'{}); handle_method(#'queue.unbind'{} = Method, _, - State = #ch{conn_pid = ConnPid, - source = ChSrc, - user = User, - queue_collector_pid = CollectorPid, - virtual_host = VHostPath}) -> + State = #ch{cfg = #conf{conn_pid = ConnPid, + source = ChSrc, + user = User, + queue_collector_pid = CollectorPid, + virtual_host = VHostPath}}) -> handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User), return_ok(State, false, #'queue.unbind_ok'{}); handle_method(#'queue.purge'{nowait = NoWait} = Method, - _, State = #ch{conn_pid = ConnPid, - source = ChSrc, - user = User, - queue_collector_pid = CollectorPid, - virtual_host = VHostPath}) -> + _, State = #ch{cfg = #conf{conn_pid = ConnPid, + source = ChSrc, + user = User, + queue_collector_pid = CollectorPid, + virtual_host = VHostPath}}) -> case handle_method(Method, ConnPid, ChSrc, CollectorPid, VHostPath, User) of {ok, PurgedMessageCount} -> @@ -1690,11 +1734,11 @@ handle_method(_MethodRecord, _Content, _State) -> %% for why. basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, ExclusiveConsume, Args, NoWait, - State = #ch{conn_pid = ConnPid, - limiter = Limiter, + State = #ch{cfg = #conf{conn_pid = ConnPid, + user = #user{username = Username}}, + limiter = Limiter, consumer_mapping = ConsumerMapping, - user = #user{username = Username}, - queue_states = QueueStates0}) -> + queue_states = QueueStates0}) -> case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> @@ -1814,8 +1858,9 @@ handle_consuming_queue_down_or_eol(QRef, %% not an HA failover. But the likelihood is not great and most users %% are unlikely to care. -cancel_consumer(CTag, QName, State = #ch{capabilities = Capabilities, - consumer_mapping = CMap}) -> +cancel_consumer(CTag, QName, + State = #ch{cfg = #conf{capabilities = Capabilities}, + consumer_mapping = CMap}) -> case rabbit_misc:table_lookup( Capabilities, <<"consumer_cancel_notify">>) of {bool, true} -> ok = send(#'basic.cancel'{consumer_tag = CTag, @@ -1885,7 +1930,8 @@ binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, basic_return(#basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}, - State = #ch{protocol = Protocol, writer_pid = WriterPid}, + State = #ch{cfg = #conf{protocol = Protocol, + writer_pid = WriterPid}}, Reason) -> ?INCR_STATS(exchange_stats, ExchangeName, 1, return_unroutable, State), {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason), @@ -1920,12 +1966,14 @@ internal_reject(Requeue, Acked, Limiter, record_sent(Type, Tag, AckRequired, Msg = {QName, QPid, MsgId, Redelivered, _Message}, - State = #ch{unacked_message_q = UAMQ, - next_tag = DeliveryTag, - trace_state = TraceState, - user = #user{username = Username}, - conn_name = ConnName, - channel = ChannelNum}) -> + State = #ch{cfg = #conf{channel = ChannelNum, + trace_state = TraceState, + user = #user{username = Username}, + conn_name = ConnName + }, + unacked_message_q = UAMQ, + next_tag = DeliveryTag + }) -> ?INCR_STATS(queue_stats, QName, 1, case {Type, AckRequired} of {get, true} -> get; {get, false} -> get_no_ack; @@ -1936,10 +1984,11 @@ record_sent(Type, Tag, AckRequired, true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State); false -> ok end, + DeliveredAt = os:system_time(millisecond), rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState), UAMQ1 = case AckRequired of - true -> ?QUEUE:in({DeliveryTag, Tag, {QPid, MsgId}}, - UAMQ); + true -> ?QUEUE:in({DeliveryTag, Tag, DeliveredAt, + {QPid, MsgId}}, UAMQ); false -> UAMQ end, State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}. @@ -1952,7 +2001,7 @@ collect_acks(Q, DeliveryTag, Multiple) -> collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> case ?QUEUE:out(Q) of - {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}}, + {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Time, _Msg}}, QTail} -> if CurrentDeliveryTag == DeliveryTag -> {[UnackedMsg | ToAcc], @@ -2007,9 +2056,10 @@ incr_queue_stats(QPid, QNames, MsgIds, State) -> %% ack first" order. new_tx() -> {?QUEUE:new(), []}. -notify_queues(State = #ch{state = closing}) -> +notify_queues(State = #ch{cfg = #conf{state = closing}}) -> {ok, State}; notify_queues(State = #ch{consumer_mapping = Consumers, + cfg = Cfg, delivering_queues = DQ }) -> QRefs0 = sets:to_list( sets:union(sets:from_list(consumer_queue_refs(Consumers)), DQ)), @@ -2017,15 +2067,15 @@ notify_queues(State = #ch{consumer_mapping = Consumers, QPids = [P || P <- QRefs0, ?IS_CLASSIC(P)], Timeout = get_operation_timeout(), {rabbit_amqqueue:notify_down_all(QPids, self(), Timeout), - State#ch{state = closing}}. + State#ch{cfg = Cfg#conf{state = closing}}}. foreach_per_queue(_F, [], Acc) -> Acc; -foreach_per_queue(F, [{_DTag, CTag, {QPid, MsgId}}], Acc) -> +foreach_per_queue(F, [{_DTag, CTag, _Time, {QPid, MsgId}}], Acc) -> %% quorum queue, needs the consumer tag F({QPid, CTag}, [MsgId], Acc); foreach_per_queue(F, UAL, Acc) -> - T = lists:foldl(fun ({_DTag, CTag, {QPid, MsgId}}, T) -> + T = lists:foldl(fun ({_DTag, CTag, _Time, {QPid, MsgId}}, T) -> rabbit_misc:gb_trees_cons({QPid, CTag}, MsgId, T) end, gb_trees:empty(), UAL), rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end, Acc, T). @@ -2061,8 +2111,9 @@ notify_limiter(Limiter, Acked) -> deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, confirm = false, mandatory = false}, - []}, State) -> %% optimisation + _RoutedToQs = []}, State) -> %% optimisation ?INCR_STATS(exchange_stats, XName, 1, publish, State), + ?INCR_STATS(exchange_stats, XName, 1, drop_unroutable, State), State; deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, @@ -2115,9 +2166,16 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ end, State2#ch{queue_states = QueueStates}. -process_routing_mandatory(true, [], Msg, State) -> +process_routing_mandatory(_Mandatory = true, + _RoutedToQs = [], + Msg, State) -> ok = basic_return(Msg, State, no_route), ok; +process_routing_mandatory(_Mandatory = false, + _RoutedToQs = [], + #basic_message{exchange_name = ExchangeName}, State) -> + ?INCR_STATS(exchange_stats, ExchangeName, 1, drop_unroutable, State), + ok; process_routing_mandatory(_, _, _, _) -> ok. @@ -2167,7 +2225,7 @@ send_confirms_and_nacks(State) -> send_nacks([], _, State) -> State; -send_nacks(_Rs, _, State = #ch{state = closing}) -> %% optimisation +send_nacks(_Rs, _, State = #ch{cfg = #conf{state = closing}}) -> %% optimisation State; send_nacks(Rs, Cs, State) -> coalesce_and_send(Rs, Cs, @@ -2178,7 +2236,7 @@ send_nacks(Rs, Cs, State) -> send_confirms([], _, State) -> State; -send_confirms(_Cs, _, State = #ch{state = closing}) -> %% optimisation +send_confirms(_Cs, _, State = #ch{cfg = #conf{state = closing}}) -> %% optimisation State; send_confirms([MsgSeqNo], _, State) -> ok = send(#'basic.ack'{delivery_tag = MsgSeqNo}, State), @@ -2243,14 +2301,14 @@ infos(Items, Deadline, State) -> end || Item <- Items]. i(pid, _) -> self(); -i(connection, #ch{conn_pid = ConnPid}) -> ConnPid; -i(number, #ch{channel = Channel}) -> Channel; -i(user, #ch{user = User}) -> User#user.username; +i(connection, #ch{cfg = #conf{conn_pid = ConnPid}}) -> ConnPid; +i(number, #ch{cfg = #conf{channel = Channel}}) -> Channel; +i(user, #ch{cfg = #conf{user = User}}) -> User#user.username; i(user_who_performed_action, Ch) -> i(user, Ch); -i(vhost, #ch{virtual_host = VHost}) -> VHost; +i(vhost, #ch{cfg = #conf{virtual_host = VHost}}) -> VHost; i(transactional, #ch{tx = Tx}) -> Tx =/= none; i(confirm, #ch{confirm_enabled = CE}) -> CE; -i(source, #ch{source = ChSrc}) -> ChSrc; +i(source, #ch{cfg = #conf{source = ChSrc}}) -> ChSrc; i(name, State) -> name(State); i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM); i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC); @@ -2259,9 +2317,9 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> ?QUEUE:len(Msgs); i(messages_uncommitted, #ch{}) -> 0; i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; -i(state, #ch{state = running}) -> credit_flow:state(); -i(state, #ch{state = State}) -> State; -i(prefetch_count, #ch{consumer_prefetch = C}) -> C; +i(state, #ch{cfg = #conf{state = running}}) -> credit_flow:state(); +i(state, #ch{cfg = #conf{state = State}}) -> State; +i(prefetch_count, #ch{cfg = #conf{consumer_prefetch = C}}) -> C; i(global_prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_prefetch_limit(Limiter); i(interceptors, #ch{interceptor_state = IState}) -> @@ -2274,7 +2332,7 @@ i(reductions, _State) -> i(Item, _) -> throw({bad_argument, Item}). -name(#ch{conn_name = ConnName, channel = Channel}) -> +name(#ch{cfg = #conf{conn_name = ConnName, channel = Channel}}) -> list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])). emit_stats(State) -> emit_stats(State, []). @@ -2297,9 +2355,9 @@ erase_queue_stats(QName) -> end || {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(), QName0 =:= QName]. -get_vhost(#ch{virtual_host = VHost}) -> VHost. +get_vhost(#ch{cfg = #conf{virtual_host = VHost}}) -> VHost. -get_user(#ch{user = User}) -> User. +get_user(#ch{cfg = #conf{user = User}}) -> User. delete_stats({queue_stats, QName}) -> rabbit_core_metrics:channel_queue_down({self(), QName}); @@ -2559,7 +2617,7 @@ handle_deliver(ConsumerTag, AckRequired, #basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}}, - State = #ch{writer_pid = WriterPid, + State = #ch{cfg = #conf{writer_pid = WriterPid}, next_tag = DeliveryTag}) -> Deliver = #'basic.deliver'{consumer_tag = ConsumerTag, delivery_tag = DeliveryTag, @@ -2592,9 +2650,29 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount, State1 = track_delivering_queue(NoAck, QPid, QName, State), {noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State1)}. -init_queue_cleanup_timer(State) -> - {ok, Interval} = application:get_env(rabbit, channel_queue_cleanup_interval), - State#ch{queue_cleanup_timer = erlang:send_after(Interval, self(), queue_cleanup)}. +init_tick_timer(State = #ch{tick_timer = undefined}) -> + {ok, Interval} = application:get_env(rabbit, channel_tick_interval), + State#ch{tick_timer = erlang:send_after(Interval, self(), tick)}; +init_tick_timer(State) -> + State. + +reset_tick_timer(State) -> + State#ch{tick_timer = undefined}. + +maybe_cancel_tick_timer(#ch{tick_timer = undefined} = State) -> + State; +maybe_cancel_tick_timer(#ch{tick_timer = TRef, + unacked_message_q = UMQ} = State) -> + case ?QUEUE:len(UMQ) of + 0 -> + %% we can only cancel the tick timer if the unacked messages + %% queue is empty. + _ = erlang:cancel_timer(TRef), + State#ch{tick_timer = undefined}; + _ -> + %% let the timer continue + State + end. %% only classic queues need monitoring so rather than special casing %% everywhere monitors are set up we wrap it here for this module @@ -2612,7 +2690,7 @@ add_delivery_count_header(#{delivery_count := Count}, Msg) -> add_delivery_count_header(_, Msg) -> Msg. -qpid_to_ref(Pid) when is_pid(Pid) -> Pid; +qpid_to_ref(Pid) when is_pid(Pid) -> Pid; qpid_to_ref({Name, _}) -> Name; %% assume it already is a ref qpid_to_ref(Ref) -> Ref. @@ -2632,3 +2710,60 @@ queue_fold(Fun, Init, Q) -> {empty, _Q} -> Init; {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) end. + +evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel, + capabilities = Capabilities, + consumer_timeout = Timeout}, + queue_names = QNames, + queue_consumers = QCons, + unacked_message_q = UAMQ}) -> + Now = os:system_time(millisecond), + case ?QUEUE:peek(UAMQ) of + {value, {_DTag, ConsumerTag, Time, {QPid, _Msg}}} + when is_integer(Timeout) + andalso Time < Now - Timeout -> + rabbit_log_channel:info("Consumer ~w on Channel ~w has timed out " + "waiting on ack", + [rabbit_data_coercion:to_binary(ConsumerTag), + Channel]), + SupportsCancel = case rabbit_misc:table_lookup( + Capabilities, + <<"consumer_cancel_notify">>) of + {bool, true} when is_binary(ConsumerTag) -> + true; + _ -> false + end, + case SupportsCancel of + false -> + Ex = rabbit_misc:amqp_error(precondition_failed, + "consumer ack timed out on channel ~w", + [Channel], none), + handle_exception(Ex, State0); + true -> + QRef = qpid_to_ref(QPid), + QName = maps:get(QRef, QNames), + %% cancel the consumer with the client + State2 = cancel_consumer(ConsumerTag, QName, State0), + [Q] = rabbit_amqqueue:lookup([QName]), + %% send basic cancel to the queue + {ok, QueueStates2} = rabbit_amqqueue:basic_cancel( + Q, self(), ConsumerTag, undefined, + <<"broker">>, State2#ch.queue_states), + %% return all in-flight messages for the consumer + {MsgIds, Rem} = lists:foldl( + fun({_DelTag, ConTag, _Time, {_, MsgId}}, + {Ids, Rem}) when ConTag == ConsumerTag -> + {[MsgId | Ids], Rem}; + (Unacked, {Ids, Rem}) -> + {Ids, ?QUEUE:in(Unacked, Rem)} + end, {[], ?QUEUE:new()}, + ?QUEUE:to_list(UAMQ)), + QueueStates = rabbit_amqqueue:requeue(QPid, {ConsumerTag, MsgIds}, + self(), QueueStates2), + {noreply, State2#ch{queue_states = QueueStates, + queue_consumers = maps:remove(QRef, QCons), + unacked_message_q = Rem}} + end; + _ -> + {noreply, State0} + end. diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl index f4980aec7d..99ad8eef34 100644 --- a/src/rabbit_core_metrics_gc.erl +++ b/src/rabbit_core_metrics_gc.erl @@ -157,7 +157,7 @@ gc_process_and_entity(Table, GbSet) -> ets:foldl(fun({{Pid, Id} = Key, _, _, _, _, _, _, _, _}, none) when Table == channel_queue_metrics -> gc_process_and_entity(Id, Pid, Table, Key, GbSet); - ({{Pid, Id} = Key, _, _, _, _}, none) + ({{Pid, Id} = Key, _, _, _, _, _}, none) when Table == channel_exchange_metrics -> gc_process_and_entity(Id, Pid, Table, Key, GbSet); ({{Id, Pid, _} = Key, _, _, _, _, _, _}, none) diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index 7368f2b8a2..e20a8f2f72 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -20,13 +20,72 @@ -export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]). -export([peer_cert_subject_items/2, peer_cert_auth_name/1]). +-export([cipher_suites_erlang/2, cipher_suites_erlang/1, + cipher_suites_openssl/2, cipher_suites_openssl/1, + cipher_suites/1]). %%-------------------------------------------------------------------------- -export_type([certificate/0]). +% Due to API differences between OTP releases. +-dialyzer(no_missing_calls). +-ignore_xref([{ssl_cipher_format, erl_suite_definition, 1}, + {ssl_cipher_format, suite, 1}, + {ssl_cipher_format, openssl_suite_name, 1}]). + -type certificate() :: rabbit_cert_info:certificate(). +-type cipher_suites_mode() :: default | all | anonymous. + +-spec cipher_suites(cipher_suites_mode()) -> ssl:ciphers(). +cipher_suites(Mode) -> + Version = get_highest_protocol_version(), + ssl:cipher_suites(Mode, Version). + +-spec cipher_suites_erlang(cipher_suites_mode()) -> + [ssl:old_cipher_suite()]. +cipher_suites_erlang(Mode) -> + Version = get_highest_protocol_version(), + cipher_suites_erlang(Mode, Version). + +-spec cipher_suites_erlang(cipher_suites_mode(), + ssl:protocol_version() | tls_record:tls_version()) -> + [ssl:old_cipher_suite()]. +cipher_suites_erlang(Mode, Version) -> + [ format_cipher_erlang(C) + || C <- ssl:cipher_suites(Mode, Version) ]. + +-spec cipher_suites_openssl(cipher_suites_mode()) -> + [ssl:old_cipher_suite()]. +cipher_suites_openssl(Mode) -> + Version = get_highest_protocol_version(), + cipher_suites_openssl(Mode, Version). + +-spec cipher_suites_openssl(cipher_suites_mode(), + ssl:protocol_version() | tls_record:tls_version()) -> + [ssl:old_cipher_suite()]. +cipher_suites_openssl(Mode, Version) -> + lists:filtermap(fun(C) -> + OpenSSL = format_cipher_openssl(C), + case is_list(OpenSSL) of + true -> {true, OpenSSL}; + false -> false + end + end, + ssl:cipher_suites(Mode, Version)). + + +format_cipher_erlang(Cipher) -> + ssl_cipher_format:erl_suite_definition(ssl_cipher_format:suite(Cipher)). + +format_cipher_openssl(Cipher) -> + ssl_cipher_format:openssl_suite_name(ssl_cipher_format:suite(Cipher)). + +-spec get_highest_protocol_version() -> tls_record:tls_version(). +get_highest_protocol_version() -> + tls_record:highest_protocol_version([]). + %%-------------------------------------------------------------------------- %% High-level functions used by reader %%-------------------------------------------------------------------------- diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 702d96c7dc..64abbe56ff 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -37,6 +37,7 @@ memory() -> MnesiaETS = mnesia_memory(), MsgIndexETS = ets_memory(msg_stores()), MetricsETS = ets_memory([rabbit_metrics]), + QuorumETS = ets_memory([ra_log_ets]), MetricsProc = try [{_, M}] = process_info(whereis(rabbit_metrics), [memory]), M @@ -87,7 +88,8 @@ memory() -> %% ETS {mnesia, MnesiaETS}, - {other_ets, ETS - MnesiaETS - MetricsETS - MgmtDbETS - MsgIndexETS}, + {quorum_ets, QuorumETS}, + {other_ets, ETS - MnesiaETS - MetricsETS - MgmtDbETS - MsgIndexETS - QuorumETS}, %% Messages (mostly, some binaries are not messages) {binary, Bin}, diff --git a/test/consumer_timeout_SUITE.erl b/test/consumer_timeout_SUITE.erl new file mode 100644 index 0000000000..8817b93c03 --- /dev/null +++ b/test/consumer_timeout_SUITE.erl @@ -0,0 +1,272 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2019 Pivotal Software, Inc. All rights reserved. +%% +%% +-module(consumer_timeout_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("kernel/include/file.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +-define(TIMEOUT, 30000). + +-import(quorum_queue_utils, [wait_for_messages/2]). + +all() -> + [ + {group, parallel_tests} + ]. + +groups() -> + AllTests = [consumer_timeout, + consumer_timeout_basic_get, + consumer_timeout_no_basic_cancel_capability + ], + [ + {parallel_tests, [], + [ + {classic_queue, [parallel], AllTests}, + {mirrored_queue, [parallel], AllTests}, + {quorum_queue, [parallel], AllTests} + ]} + ]. + +suite() -> + [ + {timetrap, {minutes, 3}} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(classic_queue, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, true}]); +init_per_group(quorum_queue, Config) -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of + ok -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); + Skip -> + Skip + end; +init_per_group(mirrored_queue, Config) -> + rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, + <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), + Config1 = rabbit_ct_helpers:set_config( + Config, [{is_mirrored, true}, + {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, true}]), + rabbit_ct_helpers:run_steps(Config1, []); +init_per_group(Group, Config0) -> + case lists:member({group, Group}, all()) of + true -> + ClusterSize = 2, + Config = rabbit_ct_helpers:merge_app_env( + Config0, {rabbit, [{channel_tick_interval, 1000}, + {quorum_tick_interval, 1000}, + {consumer_timeout, 5000}]}), + Config1 = rabbit_ct_helpers:set_config( + Config, [ {rmq_nodename_suffix, Group}, + {rmq_nodes_count, ClusterSize} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); + false -> + rabbit_ct_helpers:run_steps(Config0, []) + end. + +end_per_group(Group, Config) -> + case lists:member({group, Group}, all()) of + true -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()); + false -> + Config + end. + +init_per_testcase(Testcase, Config) -> + Group = proplists:get_value(name, ?config(tc_group_properties, Config)), + Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])), + Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])), + Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q}, + {queue_name_2, Q2}]), + rabbit_ct_helpers:testcase_started(Config1, Testcase). + +end_per_testcase(Testcase, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_2, Config)}), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +consumer_timeout(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = _, + redelivered = false}, _} -> + %% do nothing with the delivery should trigger timeout + receive + #'basic.cancel'{ } -> + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + ok + after 20000 -> + flush(1), + exit(cancel_never_happened) + end + after 5000 -> + exit(deliver_timeout) + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. + +consumer_timeout_basic_get(Config) -> + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [_DelTag] = consume(Ch, QName, [<<"msg1">>]), + erlang:monitor(process, Conn), + erlang:monitor(process, Ch), + receive + {'DOWN', _, process, Ch, _} -> ok + after 30000 -> + flush(1), + exit(channel_exit_expected) + end, + receive + {'DOWN', _, process, Conn, _} -> + flush(1), + exit(unexpected_connection_exit) + after 2000 -> + ok + end, + ok. + + +-define(CLIENT_CAPABILITIES, + [{<<"publisher_confirms">>, bool, true}, + {<<"exchange_exchange_bindings">>, bool, true}, + {<<"basic.nack">>, bool, true}, + {<<"consumer_cancel_notify">>, bool, false}, + {<<"connection.blocked">>, bool, true}, + {<<"authentication_failure_close">>, bool, true}]). + +consumer_timeout_no_basic_cancel_capability(Config) -> + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + Props = [{<<"capabilities">>, table, ?CLIENT_CAPABILITIES}], + AmqpParams = #amqp_params_network{port = Port, + host = "localhost", + virtual_host = <<"/">>, + client_properties = Props + }, + {ok, Conn} = amqp_connection:start(AmqpParams), + {ok, Ch} = amqp_connection:open_channel(Conn), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + erlang:monitor(process, Conn), + erlang:monitor(process, Ch), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = _, + redelivered = false}, _} -> + %% do nothing with the delivery should trigger timeout + ok + after 5000 -> + exit(deliver_timeout) + end, + receive + {'DOWN', _, process, Ch, _} -> ok + after 30000 -> + flush(1), + exit(channel_exit_expected) + end, + receive + {'DOWN', _, process, Conn, _} -> + flush(1), + exit(unexpected_connection_exit) + after 2000 -> + ok + end, + ok. +%%%%%%%%%%%%%%%%%%%%%%%% +%% Test helpers +%%%%%%%%%%%%%%%%%%%%%%%% + +declare_queue(Ch, Config, QName) -> + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, + arguments = Args, + durable = Durable}). +publish(Ch, QName, Payloads) -> + [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload}) + || Payload <- Payloads]. + +consume(Ch, QName, Payloads) -> + consume(Ch, QName, false, Payloads). + +consume(Ch, QName, NoAck, Payloads) -> + [begin + {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} = + amqp_channel:call(Ch, #'basic.get'{queue = QName, + no_ack = NoAck}), + DTag + end || Payload <- Payloads]. + +subscribe(Ch, Queue, NoAck) -> + subscribe(Ch, Queue, NoAck, <<"ctag">>). + +subscribe(Ch, Queue, NoAck, Ctag) -> + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, + no_ack = NoAck, + consumer_tag = Ctag}, + self()), + receive + #'basic.consume_ok'{consumer_tag = Ctag} -> + ok + end. + +flush(T) -> + receive X -> + ct:pal("flushed ~w", [X]), + flush(T) + after T -> + ok + end. diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl index eba0965608..632a314d21 100644 --- a/test/queue_parallel_SUITE.erl +++ b/test/queue_parallel_SUITE.erl @@ -58,7 +58,7 @@ groups() -> delete_immediately_by_resource ], [ - {parallel_tests, [], + {parallel_tests, [], [ {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]}, {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]}, @@ -129,19 +129,22 @@ init_per_group(mirrored_queue, Config) -> {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, {queue_durable, true}]), rabbit_ct_helpers:run_steps(Config1, []); -init_per_group(Group, Config) -> +init_per_group(Group, Config0) -> case lists:member({group, Group}, all()) of true -> ClusterSize = 2, - Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, Group}, - {rmq_nodes_count, ClusterSize} - ]), + Config = rabbit_ct_helpers:merge_app_env( + Config0, {rabbit, [{channel_tick_interval, 1000}, + {quorum_tick_interval, 1000}]}), + Config1 = rabbit_ct_helpers:set_config( + Config, [ {rmq_nodename_suffix, Group}, + {rmq_nodes_count, ClusterSize} + ]), rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()); + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); false -> - rabbit_ct_helpers:run_steps(Config, []) + rabbit_ct_helpers:run_steps(Config0, []) end. end_per_group(Group, Config) -> @@ -193,7 +196,7 @@ consume_first_empty(Config) -> consume_empty(Ch, QName), publish(Ch, QName, [<<"msg1">>]), wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - consume(Ch, QName, [<<"msg1">>]), + consume(Ch, QName, true, [<<"msg1">>]), rabbit_ct_client_helpers:close_channel(Ch). consume_from_empty_queue(Config) -> @@ -268,7 +271,9 @@ consume_and_ack(Config) -> [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. consume_and_multiple_ack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -281,7 +286,9 @@ consume_and_multiple_ack(Config) -> wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, multiple = true}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. subscribe_and_ack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -296,7 +303,9 @@ subscribe_and_ack(Config) -> wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. subscribe_and_multiple_ack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -314,7 +323,9 @@ subscribe_and_multiple_ack(Config) -> amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, multiple = true}), wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. subscribe_and_requeue_multiple_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -343,7 +354,9 @@ subscribe_and_requeue_multiple_nack(Config) -> multiple = true}), wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) end - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. consume_and_requeue_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -357,7 +370,9 @@ consume_and_requeue_nack(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = true}), - wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. consume_and_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -371,7 +386,9 @@ consume_and_nack(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = false}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. consume_and_requeue_multiple_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -385,7 +402,9 @@ consume_and_requeue_multiple_nack(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = true, requeue = true}), - wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. consume_and_multiple_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -399,7 +418,9 @@ consume_and_multiple_nack(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = true, requeue = false}), - wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. subscribe_and_requeue_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -423,7 +444,9 @@ subscribe_and_requeue_nack(Config) -> amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}), wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) end - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. subscribe_and_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -441,7 +464,9 @@ subscribe_and_nack(Config) -> multiple = false, requeue = false}), wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. subscribe_and_multiple_nack(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -461,7 +486,9 @@ subscribe_and_multiple_nack(Config) -> multiple = true, requeue = false}), wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. %% TODO test with single active basic_cancel(Config) -> @@ -472,11 +499,12 @@ basic_cancel(Config) -> publish(Ch, QName, [<<"msg1">>]), wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - subscribe(Ch, QName, false), + CTag = atom_to_binary(?FUNCTION_NAME, utf8), + subscribe(Ch, QName, false, CTag), receive {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), - amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}), + amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), ?assertEqual([], lists:filter(fun(Props) -> @@ -489,7 +517,9 @@ basic_cancel(Config) -> wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]) after 5000 -> exit(basic_deliver_timeout) - end. + end, + rabbit_ct_client_helpers:close_channel(Ch), + ok. purge(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -501,7 +531,9 @@ purge(Config) -> [_] = consume(Ch, QName, [<<"msg1">>]), wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]), {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QName}), - wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]). + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. basic_recover(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -513,7 +545,9 @@ basic_recover(Config) -> [_] = consume(Ch, QName, [<<"msg1">>]), wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), amqp_channel:cast(Ch, #'basic.recover'{requeue = true}), - wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + ok. delete_immediately_by_pid_fails(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -531,7 +565,9 @@ delete_immediately_by_pid_fails(Config) -> durable = Durable, passive = true, auto_delete = false, - arguments = Args})). + arguments = Args})), + rabbit_ct_client_helpers:close_channel(Ch), + ok. delete_immediately_by_pid_succeeds(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -550,7 +586,9 @@ delete_immediately_by_pid_succeeds(Config) -> durable = Durable, passive = true, auto_delete = false, - arguments = Args})). + arguments = Args})), + rabbit_ct_client_helpers:close_channel(Ch), + ok. delete_immediately_by_resource(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -568,7 +606,9 @@ delete_immediately_by_resource(Config) -> durable = Durable, passive = true, auto_delete = false, - arguments = Args})). + arguments = Args})), + rabbit_ct_client_helpers:close_channel(Ch), + ok. %%%%%%%%%%%%%%%%%%%%%%%% %% Test helpers @@ -600,12 +640,15 @@ consume_empty(Ch, QName) -> amqp_channel:call(Ch, #'basic.get'{queue = QName})). subscribe(Ch, Queue, NoAck) -> + subscribe(Ch, Queue, NoAck, <<"ctag">>). + +subscribe(Ch, Queue, NoAck, Ctag) -> amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, no_ack = NoAck, - consumer_tag = <<"ctag">>}, + consumer_tag = Ctag}, self()), receive - #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + #'basic.consume_ok'{consumer_tag = Ctag} -> ok end. @@ -614,3 +657,11 @@ receive_basic_deliver(Redelivered) -> {#'basic.deliver'{redelivered = R}, _} when R == Redelivered -> ok end. + +flush(T) -> + receive X -> + ct:pal("flushed ~w", [X]), + flush(T) + after T -> + ok + end. diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 61f9328855..c23b7ac85e 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -172,7 +172,7 @@ init_per_group(Group, Config) -> ok -> ok = rabbit_ct_broker_helpers:rpc( Config2, 0, application, set_env, - [rabbit, channel_queue_cleanup_interval, 100]), + [rabbit, channel_tick_interval, 100]), %% HACK: the larger cluster sizes benefit for a bit more time %% after clustering before running the tests. case Group of diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl index ea6e973ca2..c44b799caa 100644 --- a/test/rabbit_core_metrics_gc_SUITE.erl +++ b/test/rabbit_core_metrics_gc_SUITE.erl @@ -177,6 +177,8 @@ channel_metrics(Config) -> amqp_channel:call(Ch, #'queue.declare'{queue = <<"queue_metrics">>}), amqp_channel:cast(Ch, #'basic.publish'{routing_key = <<"queue_metrics">>}, #amqp_msg{payload = <<"hello">>}), + amqp_channel:cast(Ch, #'basic.publish'{routing_key = <<"won't route $ยข% anywhere">>}, + #amqp_msg{payload = <<"hello">>}), {#'basic.get_ok'{}, _} = amqp_channel:call(Ch, #'basic.get'{queue = <<"queue_metrics">>, no_ack=true}), timer:sleep(150), diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl index 866d529489..ed64fcf1c5 100644 --- a/test/unit_inbroker_non_parallel_SUITE.erl +++ b/test/unit_inbroker_non_parallel_SUITE.erl @@ -39,7 +39,8 @@ groups() -> file_handle_cache, %% Change FHC limit. head_message_timestamp_statistics, %% Expect specific statistics. log_management, %% Check log files. - log_management_during_startup, %% Check log files. + log_file_initialised_during_startup, + log_file_fails_to_initialise_during_startup, externally_rotated_logs_are_automatically_reopened %% Check log files. ]} ]. @@ -271,11 +272,11 @@ log_management1(_Config) -> ok = test_logs_working([LogFile]), passed. -log_management_during_startup(Config) -> +log_file_initialised_during_startup(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, log_management_during_startup1, [Config]). + ?MODULE, log_file_initialised_during_startup1, [Config]). -log_management_during_startup1(_Config) -> +log_file_initialised_during_startup1(_Config) -> [LogFile|_] = rabbit:log_locations(), Suffix = ".0", @@ -299,57 +300,72 @@ log_management_during_startup1(_Config) -> application:unset_env(lager, extra_sinks), ok = rabbit:start(), - %% start application with logging to directory with no - %% write permissions - ok = rabbit:stop(), - NoPermission1 = "/var/empty/test.log", - delete_file(NoPermission1), - delete_file(filename:dirname(NoPermission1)), - ok = rabbit:stop(), - ok = application:set_env(rabbit, lager_default_file, NoPermission1), + %% clean up + ok = application:set_env(rabbit, lager_default_file, LogFile), application:unset_env(rabbit, log), application:unset_env(lager, handlers), application:unset_env(lager, extra_sinks), - ok = try rabbit:start() of + ok = rabbit:start(), + passed. + + +log_file_fails_to_initialise_during_startup(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, log_file_fails_to_initialise_during_startup1, [Config]). + +log_file_fails_to_initialise_during_startup1(_Config) -> + [LogFile|_] = rabbit:log_locations(), + Suffix = ".0", + + %% start application with logging to directory with no + %% write permissions + ok = rabbit:stop(), + + Run1 = fun() -> + NoPermission1 = "/var/empty/test.log", + delete_file(NoPermission1), + delete_file(filename:dirname(NoPermission1)), + ok = rabbit:stop(), + ok = application:set_env(rabbit, lager_default_file, NoPermission1), + application:unset_env(rabbit, log), + application:unset_env(lager, handlers), + application:unset_env(lager, extra_sinks), + rabbit:start() + end, + + ok = try Run1() of ok -> exit({got_success_but_expected_failure, log_rotation_no_write_permission_dir_test}) catch - _:{error, {cannot_log_to_file, _, Reason1}} - when Reason1 =:= enoent orelse Reason1 =:= eacces -> ok; - _:{error, {cannot_log_to_file, _, - {cannot_create_parent_dirs, _, Reason1}}} - when Reason1 =:= eperm orelse - Reason1 =:= eacces orelse - Reason1 =:= enoent-> ok + _:could_not_initialise_logger -> ok end, %% start application with logging to a subdirectory which %% parent directory has no write permissions NoPermission2 = "/var/empty/non-existent/test.log", - delete_file(NoPermission2), - delete_file(filename:dirname(NoPermission2)), - case rabbit:stop() of - ok -> ok; - {error, lager_not_running} -> ok + + Run2 = fun() -> + delete_file(NoPermission2), + delete_file(filename:dirname(NoPermission2)), + case rabbit:stop() of + ok -> ok; + {error, lager_not_running} -> ok + end, + ok = application:set_env(rabbit, lager_default_file, NoPermission2), + application:unset_env(rabbit, log), + application:unset_env(lager, handlers), + application:unset_env(lager, extra_sinks), + rabbit:start() end, - ok = application:set_env(rabbit, lager_default_file, NoPermission2), - application:unset_env(rabbit, log), - application:unset_env(lager, handlers), - application:unset_env(lager, extra_sinks), - ok = try rabbit:start() of + + ok = try Run2() of ok -> exit({got_success_but_expected_failure, log_rotation_parent_dirs_test}) catch - _:{error, {cannot_log_to_file, _, Reason2}} - when Reason2 =:= enoent orelse Reason2 =:= eacces -> ok; - _:{error, {cannot_log_to_file, _, - {cannot_create_parent_dirs, _, Reason2}}} - when Reason2 =:= eperm orelse - Reason2 =:= eacces orelse - Reason2 =:= enoent-> ok + _:could_not_initialise_logger -> ok end, - %% cleanup + %% clean up ok = application:set_env(rabbit, lager_default_file, LogFile), application:unset_env(rabbit, log), application:unset_env(lager, handlers), @@ -494,7 +510,7 @@ channel_statistics1(_Config) -> [{{Ch, QRes}, 1, 0, 0, 0, 0, 0, 0, 0}] = ets:lookup( channel_queue_metrics, {Ch, QRes}), - [{{Ch, X}, 1, 0, 0, 0}] = ets:lookup( + [{{Ch, X}, 1, 0, 0, 0, 0}] = ets:lookup( channel_exchange_metrics, {Ch, X}), [{{Ch, {QRes, X}}, 1, 0}] = ets:lookup( @@ -509,7 +525,7 @@ channel_statistics1(_Config) -> [{{Ch, QRes}, 1, 0, 0, 0, 0, 0, 0, 1}] = ets:lookup( channel_queue_metrics, {Ch, QRes}), - [{{Ch, X}, 1, 0, 0, 0}] = ets:lookup( + [{{Ch, X}, 1, 0, 0, 0, 0}] = ets:lookup( channel_exchange_metrics, {Ch, X}), [{{Ch, {QRes, X}}, 1, 1}] = ets:lookup( @@ -522,7 +538,7 @@ channel_statistics1(_Config) -> force_metric_gc(), Check4 = fun() -> [] = ets:lookup(channel_queue_metrics, {Ch, QRes}), - [{{Ch, X}, 1, 0, 0, 0}] = ets:lookup( + [{{Ch, X}, 1, 0, 0, 0, 0}] = ets:lookup( channel_exchange_metrics, {Ch, X}), [] = ets:lookup(channel_queue_exchange_metrics, |