diff options
author | kjnilsson <knilsson@pivotal.io> | 2020-01-21 15:51:06 +0000 |
---|---|---|
committer | kjnilsson <knilsson@pivotal.io> | 2020-01-21 15:52:10 +0000 |
commit | db92a4a4cbc3f9ef293cbbcabc3e99085cb22761 (patch) | |
tree | c33e2f592311fe9a7a5565b58b22bfb4e8492dd7 | |
parent | 9e9093bedcf022a2f95a44171338352a23ca1268 (diff) | |
download | rabbitmq-server-git-db92a4a4cbc3f9ef293cbbcabc3e99085cb22761.tar.gz |
optimisations
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 17 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 89 | ||||
-rw-r--r-- | src/rabbit_looking_glass.erl | 59 | ||||
-rw-r--r-- | src/rabbit_stream2_queue.erl | 68 |
5 files changed, 149 insertions, 86 deletions
@@ -96,7 +96,7 @@ define PROJECT_ENV {msg_store_io_batch_size, 4096}, %% see rabbitmq-server#143, %% rabbitmq-server#949, rabbitmq-server#1098 - {credit_flow_default_credit, {400, 200}}, + {credit_flow_default_credit, {4000, 2000}}, {quorum_commands_soft_limit, 256}, {quorum_cluster_size, 5}, %% see rabbitmq-server#248 diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 31eb9229fc..1658952ade 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -1591,13 +1591,14 @@ ack(QPid, {CTag, QName, MsgIds}, ChPid, QueueStates) when ?IS_CLASSIC(QPid) -> #{QName := QState0} when element(1, QState0) == stream2_client -> %% stream2 cqueue - {ok, QState} = rabbit_stream2_queue:credit(QState0, CTag, - length(MsgIds)), - maps:put(QName, QState, QueueStates); + {ok, Messages, QState} = rabbit_stream2_queue:credit(QState0, CTag, + length(MsgIds)), + {maps:put(QName, QState, QueueStates), Messages}; _ -> %% classic - delegate:invoke_no_result(QPid, {gen_server2, cast, [{ack, MsgIds, ChPid}]}), - QueueStates + delegate:invoke_no_result(QPid, {gen_server2, cast, + [{ack, MsgIds, ChPid}]}), + {QueueStates, []} end; ack({Name, _} = QPid, {CTag, MsgIds}, _ChPid, QuorumStates) when ?IS_QUORUM(QPid) -> @@ -1610,13 +1611,13 @@ ack({Name, _} = QPid, {CTag, MsgIds}, _ChPid, QuorumStates) %% rather than an explicit msgid {ok, QState} = rabbit_stream_queue:credit(QState0, CTag, length(MsgIds)), - maps:put(Name, QState, QuorumStates); + {maps:put(Name, QState, QuorumStates), []}; #{Name := QState0} -> {ok, QState} = rabbit_quorum_queue:ack(CTag, MsgIds, QState0), - maps:put(Name, QState, QuorumStates); + {maps:put(Name, QState, QuorumStates), []}; _ -> %% queue was not found - QuorumStates + {QuorumStates, []} end. -spec reject(pid() | amqqueue:ra_server_id(), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8f0ca48377..dc3d20f587 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -840,10 +840,20 @@ handle_info({ra_event, {Name, _} = From, EvtBody} = Evt, handle_info({osiris_offset, QName, Offs}, #ch{queue_states = QueueStates0} = State) -> + % rabbit_log:info("osiris_offset ~w ~w ~w", [QName, Offs, QueueStates0]), case QueueStates0 of #{QName := QState0} when ?IS_STREAM2(QState0) -> - QState = rabbit_stream2_queue:handle_offset(QState0, Offs), - noreply(State#ch{queue_states = QueueStates0#{QName => QState}}); + {QState, TagMsgs} = rabbit_stream2_queue:handle_offset(QState0, Offs), + % rabbit_log:info("osiris_offset tag msgs ~w", [TagMsgs]), + noreply( + lists:foldl( + fun({Tag, LeaderPid, OffsetMsgs}, Acc) -> + handle_stream_deliveries(Tag, LeaderPid, + QName, OffsetMsgs, Acc) + end, + State#ch{queue_states = QueueStates0#{QName => QState}}, + TagMsgs) + ); _ -> noreply(State) end; @@ -951,6 +961,7 @@ handle_post_hibernate(State0) -> terminate(_Reason, State = #ch{cfg = #conf{user = #user{username = Username}}}) -> + rabbit_log:warning("terminate with ~w", [_Reason]), {_Res, _State1} = notify_queues(State), pg_local:leave(rabbit_channels, self()), rabbit_event:if_enabled(State, #ch.stats_timer, @@ -2121,24 +2132,28 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> end. %% NB: Acked is in youngest-first order -ack(Acked, State = #ch{queue_names = QNames, - queue_states = QueueStates0}) -> - QueueStates = - foreach_per_queue( - fun ({QPid, QName, CTag}, MsgIds, Acc0) -> - Acc = rabbit_amqqueue:ack(QPid, {CTag, QName, MsgIds}, self(), Acc0), - incr_queue_stats(QPid, QNames, MsgIds, State), - Acc - end, Acked, QueueStates0), +ack(Acked, State) -> ok = notify_limiter(State#ch.limiter, Acked), - State#ch{queue_states = QueueStates}. - -incr_queue_stats(QPid, QNames, MsgIds, State) -> - case maps:find(qpid_to_ref(QPid), QNames) of - {ok, QName} -> Count = length(MsgIds), - ?INCR_STATS(queue_stats, QName, Count, ack, State); - error -> ok - end. + foreach_per_queue( + fun ({QPid, QName, CTag}, MsgIds, + #ch{queue_states = QS0} = Acc) -> + {QS, {_, _, OffsetMsgs}} = rabbit_amqqueue:ack(QPid, + {CTag, QName, MsgIds}, + self(), QS0), + incr_queue_stats(QName, MsgIds, State), + handle_stream_deliveries(CTag, QPid, + QName, OffsetMsgs, + Acc#ch{queue_states = QS}) + end, Acked, State). + % lists:foldl( + % fun({Tag, LeaderPid, OffsetMsgs}, Acc) -> + % handle_stream_deliveries(Tag, LeaderPid, + % QName, OffsetMsgs, Acc) + % end, State#ch{queue_states = QueueStates}, TagMessages). + +incr_queue_stats(QName, MsgIds, State) -> + Count = length(MsgIds), + ?INCR_STATS(queue_stats, QName, Count, ack, State). %% {Msgs, Acks} %% @@ -2739,6 +2754,42 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, check_not_default_exchange(ExchangeName), _ = rabbit_exchange:lookup_or_die(ExchangeName). +handle_stream_deliveries(_ConsumerTag, _QPid, _QName, [], State) -> + State; +handle_stream_deliveries(ConsumerTag, QPid, QName, MsgBins, + #ch{cfg = #conf{writer_pid = WriterPid}, + writer_gc_threshold = GCThreshold, + unacked_message_q = UAMQ0, + next_tag = NextTag0} = State) -> + % rabbit_log:info("handle stream deliveries ~w ~w", [QName, hd(MsgBins)]), + DeliveredAt = os:system_time(millisecond), + {NextTag, Bins, UAMQ} = + lists:foldl( + fun ({Offs, Bin}, {DTag, Bins, UAMQ}) -> + #basic_message{exchange_name = ExchangeName, + routing_keys = [RoutingKey | _CcRoutes], + content = Content} = binary_to_term(Bin), + Deliver = #'basic.deliver'{consumer_tag = ConsumerTag, + delivery_tag = DTag, + redelivered = false, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey}, + ok = rabbit_writer:send_command(WriterPid, Deliver, + Content), + {DTag +1, [Bin | Bins], + ?QUEUE:in({DTag, ConsumerTag, DeliveredAt, + {QPid, QName, Offs}}, UAMQ)} + end, {NextTag0, [], UAMQ0}, MsgBins), + % rabbit_log:info("handle stream deliveries next tag ~w", [NextTag]), + case GCThreshold of + undefined -> ok; + _ -> + rabbit_basic:maybe_gc_large_msg(Bins, GCThreshold) + end, + ?INCR_STATS(queue_stats, QName, NextTag - NextTag0, deliver, State), + State#ch{next_tag = NextTag, + unacked_message_q = UAMQ}. + handle_deliver(ConsumerTag, AckRequired, Msg = {QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, diff --git a/src/rabbit_looking_glass.erl b/src/rabbit_looking_glass.erl index e90740f472..d958bbcaba 100644 --- a/src/rabbit_looking_glass.erl +++ b/src/rabbit_looking_glass.erl @@ -19,39 +19,58 @@ -ignore_xref([{lg, trace, 4}]). -ignore_xref([{maps, from_list, 1}]). --export([boot/0]). --export([connections/0]). +-export([boot/0, + stop/0]). +-export([connections/0, + channels/0]). +-export([profile_channels/0]). boot() -> case os:getenv("RABBITMQ_TRACER") of false -> ok; Value -> - Input = parse_value(Value), + % Input = parse_value(Value), rabbit_log:info( "Enabling Looking Glass profiler, input value: ~p", - [Input] + [Value] ), {ok, _} = application:ensure_all_started(looking_glass), - lg:trace( - Input, - lg_file_tracer, - "traces.lz4", - maps:from_list([ - {mode, profile}, - {process_dump, true}, - {running, true}, - {send, true}] - ) - ) + ok end. -parse_value(Value) -> - [begin - [Mod, Fun] = string:tokens(C, ":"), - {callback, list_to_atom(Mod), list_to_atom(Fun)} - end || C <- string:tokens(Value, ",")]. +% parse_value(Value) -> +% [begin +% [Mod, Fun] = string:tokens(C, ":"), +% {callback, list_to_atom(Mod), list_to_atom(Fun)} +% end || C <- string:tokens(Value, ",")]. connections() -> Pids = [Pid || {{conns_sup, _}, Pid} <- ets:tab2list(ranch_server)], ['_', {scope, Pids}]. + +channels() -> + Pids = rabbit_channel:list_local(), + ['_', {scope, Pids}]. + +profile_channels() -> + lg:trace( + [ + rabbit_channel, + rabbit_amqqueue, + rabbit_stream2_queue, + osiris_segment, + rabbit_basic, + rabbit_writer, + lqueue + ], + lg_file_tracer, + "traces.lz4", + maps:from_list([ + {mode, profile}, + {process_dump, false}, + {send, false}] + )). + +stop() -> + lg:stop(). diff --git a/src/rabbit_stream2_queue.erl b/src/rabbit_stream2_queue.erl index 7643ad0f46..68ceb89dc3 100644 --- a/src/rabbit_stream2_queue.erl +++ b/src/rabbit_stream2_queue.erl @@ -70,7 +70,7 @@ stream_entries(Name, Id, Str) -> stream_entries(Name, LeaderPid, #stream{credit = Credit, - start_offset = StartOffs, + start_offset = _StartOffs, listening_offset = LOffs, log = Seg0} = Str0, MsgIn) when Credit > 0 -> @@ -90,13 +90,11 @@ stream_entries(Name, LeaderPid, {Str0#stream{log = Seg}, MsgIn} end; {Records, Seg} -> - Msgs = [begin - Msg0 = binary_to_term(B), - Msg = rabbit_basic:add_header(<<"x-stream-offset">>, - long, O, Msg0), - {Name, LeaderPid, O, false, Msg} - end || {O, B} <- Records, - O >= StartOffs], + % Msgs = [begin + % R + % end || {O, _} = R <- Records, + % O >= StartOffs], + Msgs = Records, % rabbit_log:info("stream2 msgs out ~p", [Msgs]), % rabbit_log:info("stream entries got entries", [Entries0]), NumMsgs = length(Msgs), @@ -106,7 +104,7 @@ stream_entries(Name, LeaderPid, case Str#stream.credit < 1 of true -> %% we are done here - % rabbit_log:info("stream entries out ~w ~w", [Entries0, Msgs]), + % rabbit_log:info("stream entries out ~w ~w", [Msgs, Msgs]), {Str, MsgIn ++ Msgs}; false -> %% if there are fewer Msgs than Entries0 it means there were non-events @@ -199,36 +197,33 @@ handle_offset(#stream2_client{name = Name, % rabbit_log:info("handle_offset ~w", [_Offs]), %% offset isn't actually needed as we use the atomic to read the %% current committed - Readers = maps:map( - fun (Tag, Str0) -> - {Str, Msgs} = stream_entries(Name, Leader, Str0), - %% HACK for now, better to just return but - %% tricky with acks credits - %% that also evaluate the stream - gen_server:cast(self(), {stream_delivery, Tag, Msgs}), - Str - end, Readers0), - State#stream2_client{readers = Readers}. + {Readers, TagMsgs} = maps:fold( + fun (Tag, Str0, {Acc, TM}) -> + % rabbit_log:info("handle_offset for ~w", [Tag]), + {Str, Msgs} = stream_entries(Name, Leader, Str0), + %% HACK for now, better to just return but + %% tricky with acks credits + %% that also evaluate the stream + % gen_server:cast(self(), {stream_delivery, Tag, Msgs}), + {Acc#{Tag => Str}, [{Tag, Leader, Msgs} | TM]} + end, {#{}, []}, Readers0), + {State#stream2_client{readers = Readers}, TagMsgs}. credit(#stream2_client{name = Name, leader = Leader, readers = Readers0} = State, Tag, Credit) -> % rabbit_log:info("stream2 credit ~w ~w", [Credit, Tag]), - Readers = case Readers0 of - #{Tag := #stream{credit = Credit0} = Str0} -> - % rabbit_log:info("stream2 credit yeah ~w ~w", [Credit, Tag]), - Str1 = Str0#stream{credit = Credit0 + Credit}, - {Str, Msgs} = stream_entries(Name, Leader, Str1), - %% HACK for now, better to just return but - %% tricky with acks credits - %% that also evaluate the stream - gen_server:cast(self(), {stream_delivery, Tag, Msgs}), - Readers0#{Tag => Str}; - _ -> - Readers0 - end, - {ok, State#stream2_client{readers = Readers}}. + {Readers, TagMsgs} = case Readers0 of + #{Tag := #stream{credit = Credit0} = Str0} -> + % rabbit_log:info("stream2 credit yeah ~w ~w", [Credit, Tag]), + Str1 = Str0#stream{credit = Credit0 + Credit}, + {Str, Msgs0} = stream_entries(Name, Leader, Str1), + {Readers0#{Tag => Str}, {Tag, Leader, Msgs0}}; + _ -> + {Readers0, []} + end, + {ok, TagMsgs, State#stream2_client{readers = Readers}}. %% MGMT @@ -240,12 +235,9 @@ declare(Q0) -> ActingUser = maps:get(user, Opts, ?UNKNOWN_USER), Replicas = rabbit_mnesia:cluster_nodes(all) -- [node()], N = ra_lib:derive_safe_string(atom_to_list(Name), 8), - Dir = filename:join(rabbit_mnesia:dir(), "streams"), - file:make_dir(Dir), % rabbit_log:info("Declare stream2 in ~s", [Dir]), - Conf = #{dir => Dir, - reference => QName, - name => N}, + Conf = #{name => N, + reference => QName}, {ok, LeaderPid, ReplicaPids} = osiris:start_cluster(N, Replicas, Conf), Q1 = amqqueue:set_slave_pids( amqqueue:set_pid(Q0, LeaderPid), ReplicaPids), |