summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-01-21 15:51:06 +0000
committerkjnilsson <knilsson@pivotal.io>2020-01-21 15:52:10 +0000
commitdb92a4a4cbc3f9ef293cbbcabc3e99085cb22761 (patch)
treec33e2f592311fe9a7a5565b58b22bfb4e8492dd7
parent9e9093bedcf022a2f95a44171338352a23ca1268 (diff)
downloadrabbitmq-server-git-db92a4a4cbc3f9ef293cbbcabc3e99085cb22761.tar.gz
optimisations
-rw-r--r--Makefile2
-rw-r--r--src/rabbit_amqqueue.erl17
-rw-r--r--src/rabbit_channel.erl89
-rw-r--r--src/rabbit_looking_glass.erl59
-rw-r--r--src/rabbit_stream2_queue.erl68
5 files changed, 149 insertions, 86 deletions
diff --git a/Makefile b/Makefile
index 7680872cd4..902cf08755 100644
--- a/Makefile
+++ b/Makefile
@@ -96,7 +96,7 @@ define PROJECT_ENV
{msg_store_io_batch_size, 4096},
%% see rabbitmq-server#143,
%% rabbitmq-server#949, rabbitmq-server#1098
- {credit_flow_default_credit, {400, 200}},
+ {credit_flow_default_credit, {4000, 2000}},
{quorum_commands_soft_limit, 256},
{quorum_cluster_size, 5},
%% see rabbitmq-server#248
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 31eb9229fc..1658952ade 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -1591,13 +1591,14 @@ ack(QPid, {CTag, QName, MsgIds}, ChPid, QueueStates) when ?IS_CLASSIC(QPid) ->
#{QName := QState0}
when element(1, QState0) == stream2_client ->
%% stream2 cqueue
- {ok, QState} = rabbit_stream2_queue:credit(QState0, CTag,
- length(MsgIds)),
- maps:put(QName, QState, QueueStates);
+ {ok, Messages, QState} = rabbit_stream2_queue:credit(QState0, CTag,
+ length(MsgIds)),
+ {maps:put(QName, QState, QueueStates), Messages};
_ ->
%% classic
- delegate:invoke_no_result(QPid, {gen_server2, cast, [{ack, MsgIds, ChPid}]}),
- QueueStates
+ delegate:invoke_no_result(QPid, {gen_server2, cast,
+ [{ack, MsgIds, ChPid}]}),
+ {QueueStates, []}
end;
ack({Name, _} = QPid, {CTag, MsgIds}, _ChPid, QuorumStates)
when ?IS_QUORUM(QPid) ->
@@ -1610,13 +1611,13 @@ ack({Name, _} = QPid, {CTag, MsgIds}, _ChPid, QuorumStates)
%% rather than an explicit msgid
{ok, QState} = rabbit_stream_queue:credit(QState0, CTag,
length(MsgIds)),
- maps:put(Name, QState, QuorumStates);
+ {maps:put(Name, QState, QuorumStates), []};
#{Name := QState0} ->
{ok, QState} = rabbit_quorum_queue:ack(CTag, MsgIds, QState0),
- maps:put(Name, QState, QuorumStates);
+ {maps:put(Name, QState, QuorumStates), []};
_ ->
%% queue was not found
- QuorumStates
+ {QuorumStates, []}
end.
-spec reject(pid() | amqqueue:ra_server_id(),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 8f0ca48377..dc3d20f587 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -840,10 +840,20 @@ handle_info({ra_event, {Name, _} = From, EvtBody} = Evt,
handle_info({osiris_offset, QName, Offs},
#ch{queue_states = QueueStates0} = State) ->
+ % rabbit_log:info("osiris_offset ~w ~w ~w", [QName, Offs, QueueStates0]),
case QueueStates0 of
#{QName := QState0} when ?IS_STREAM2(QState0) ->
- QState = rabbit_stream2_queue:handle_offset(QState0, Offs),
- noreply(State#ch{queue_states = QueueStates0#{QName => QState}});
+ {QState, TagMsgs} = rabbit_stream2_queue:handle_offset(QState0, Offs),
+ % rabbit_log:info("osiris_offset tag msgs ~w", [TagMsgs]),
+ noreply(
+ lists:foldl(
+ fun({Tag, LeaderPid, OffsetMsgs}, Acc) ->
+ handle_stream_deliveries(Tag, LeaderPid,
+ QName, OffsetMsgs, Acc)
+ end,
+ State#ch{queue_states = QueueStates0#{QName => QState}},
+ TagMsgs)
+ );
_ ->
noreply(State)
end;
@@ -951,6 +961,7 @@ handle_post_hibernate(State0) ->
terminate(_Reason,
State = #ch{cfg = #conf{user = #user{username = Username}}}) ->
+ rabbit_log:warning("terminate with ~w", [_Reason]),
{_Res, _State1} = notify_queues(State),
pg_local:leave(rabbit_channels, self()),
rabbit_event:if_enabled(State, #ch.stats_timer,
@@ -2121,24 +2132,28 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
end.
%% NB: Acked is in youngest-first order
-ack(Acked, State = #ch{queue_names = QNames,
- queue_states = QueueStates0}) ->
- QueueStates =
- foreach_per_queue(
- fun ({QPid, QName, CTag}, MsgIds, Acc0) ->
- Acc = rabbit_amqqueue:ack(QPid, {CTag, QName, MsgIds}, self(), Acc0),
- incr_queue_stats(QPid, QNames, MsgIds, State),
- Acc
- end, Acked, QueueStates0),
+ack(Acked, State) ->
ok = notify_limiter(State#ch.limiter, Acked),
- State#ch{queue_states = QueueStates}.
-
-incr_queue_stats(QPid, QNames, MsgIds, State) ->
- case maps:find(qpid_to_ref(QPid), QNames) of
- {ok, QName} -> Count = length(MsgIds),
- ?INCR_STATS(queue_stats, QName, Count, ack, State);
- error -> ok
- end.
+ foreach_per_queue(
+ fun ({QPid, QName, CTag}, MsgIds,
+ #ch{queue_states = QS0} = Acc) ->
+ {QS, {_, _, OffsetMsgs}} = rabbit_amqqueue:ack(QPid,
+ {CTag, QName, MsgIds},
+ self(), QS0),
+ incr_queue_stats(QName, MsgIds, State),
+ handle_stream_deliveries(CTag, QPid,
+ QName, OffsetMsgs,
+ Acc#ch{queue_states = QS})
+ end, Acked, State).
+ % lists:foldl(
+ % fun({Tag, LeaderPid, OffsetMsgs}, Acc) ->
+ % handle_stream_deliveries(Tag, LeaderPid,
+ % QName, OffsetMsgs, Acc)
+ % end, State#ch{queue_states = QueueStates}, TagMessages).
+
+incr_queue_stats(QName, MsgIds, State) ->
+ Count = length(MsgIds),
+ ?INCR_STATS(queue_stats, QName, Count, ack, State).
%% {Msgs, Acks}
%%
@@ -2739,6 +2754,42 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
check_not_default_exchange(ExchangeName),
_ = rabbit_exchange:lookup_or_die(ExchangeName).
+handle_stream_deliveries(_ConsumerTag, _QPid, _QName, [], State) ->
+ State;
+handle_stream_deliveries(ConsumerTag, QPid, QName, MsgBins,
+ #ch{cfg = #conf{writer_pid = WriterPid},
+ writer_gc_threshold = GCThreshold,
+ unacked_message_q = UAMQ0,
+ next_tag = NextTag0} = State) ->
+ % rabbit_log:info("handle stream deliveries ~w ~w", [QName, hd(MsgBins)]),
+ DeliveredAt = os:system_time(millisecond),
+ {NextTag, Bins, UAMQ} =
+ lists:foldl(
+ fun ({Offs, Bin}, {DTag, Bins, UAMQ}) ->
+ #basic_message{exchange_name = ExchangeName,
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content} = binary_to_term(Bin),
+ Deliver = #'basic.deliver'{consumer_tag = ConsumerTag,
+ delivery_tag = DTag,
+ redelivered = false,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey},
+ ok = rabbit_writer:send_command(WriterPid, Deliver,
+ Content),
+ {DTag +1, [Bin | Bins],
+ ?QUEUE:in({DTag, ConsumerTag, DeliveredAt,
+ {QPid, QName, Offs}}, UAMQ)}
+ end, {NextTag0, [], UAMQ0}, MsgBins),
+ % rabbit_log:info("handle stream deliveries next tag ~w", [NextTag]),
+ case GCThreshold of
+ undefined -> ok;
+ _ ->
+ rabbit_basic:maybe_gc_large_msg(Bins, GCThreshold)
+ end,
+ ?INCR_STATS(queue_stats, QName, NextTag - NextTag0, deliver, State),
+ State#ch{next_tag = NextTag,
+ unacked_message_q = UAMQ}.
+
handle_deliver(ConsumerTag, AckRequired,
Msg = {QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
diff --git a/src/rabbit_looking_glass.erl b/src/rabbit_looking_glass.erl
index e90740f472..d958bbcaba 100644
--- a/src/rabbit_looking_glass.erl
+++ b/src/rabbit_looking_glass.erl
@@ -19,39 +19,58 @@
-ignore_xref([{lg, trace, 4}]).
-ignore_xref([{maps, from_list, 1}]).
--export([boot/0]).
--export([connections/0]).
+-export([boot/0,
+ stop/0]).
+-export([connections/0,
+ channels/0]).
+-export([profile_channels/0]).
boot() ->
case os:getenv("RABBITMQ_TRACER") of
false ->
ok;
Value ->
- Input = parse_value(Value),
+ % Input = parse_value(Value),
rabbit_log:info(
"Enabling Looking Glass profiler, input value: ~p",
- [Input]
+ [Value]
),
{ok, _} = application:ensure_all_started(looking_glass),
- lg:trace(
- Input,
- lg_file_tracer,
- "traces.lz4",
- maps:from_list([
- {mode, profile},
- {process_dump, true},
- {running, true},
- {send, true}]
- )
- )
+ ok
end.
-parse_value(Value) ->
- [begin
- [Mod, Fun] = string:tokens(C, ":"),
- {callback, list_to_atom(Mod), list_to_atom(Fun)}
- end || C <- string:tokens(Value, ",")].
+% parse_value(Value) ->
+% [begin
+% [Mod, Fun] = string:tokens(C, ":"),
+% {callback, list_to_atom(Mod), list_to_atom(Fun)}
+% end || C <- string:tokens(Value, ",")].
connections() ->
Pids = [Pid || {{conns_sup, _}, Pid} <- ets:tab2list(ranch_server)],
['_', {scope, Pids}].
+
+channels() ->
+ Pids = rabbit_channel:list_local(),
+ ['_', {scope, Pids}].
+
+profile_channels() ->
+ lg:trace(
+ [
+ rabbit_channel,
+ rabbit_amqqueue,
+ rabbit_stream2_queue,
+ osiris_segment,
+ rabbit_basic,
+ rabbit_writer,
+ lqueue
+ ],
+ lg_file_tracer,
+ "traces.lz4",
+ maps:from_list([
+ {mode, profile},
+ {process_dump, false},
+ {send, false}]
+ )).
+
+stop() ->
+ lg:stop().
diff --git a/src/rabbit_stream2_queue.erl b/src/rabbit_stream2_queue.erl
index 7643ad0f46..68ceb89dc3 100644
--- a/src/rabbit_stream2_queue.erl
+++ b/src/rabbit_stream2_queue.erl
@@ -70,7 +70,7 @@ stream_entries(Name, Id, Str) ->
stream_entries(Name, LeaderPid,
#stream{credit = Credit,
- start_offset = StartOffs,
+ start_offset = _StartOffs,
listening_offset = LOffs,
log = Seg0} = Str0, MsgIn)
when Credit > 0 ->
@@ -90,13 +90,11 @@ stream_entries(Name, LeaderPid,
{Str0#stream{log = Seg}, MsgIn}
end;
{Records, Seg} ->
- Msgs = [begin
- Msg0 = binary_to_term(B),
- Msg = rabbit_basic:add_header(<<"x-stream-offset">>,
- long, O, Msg0),
- {Name, LeaderPid, O, false, Msg}
- end || {O, B} <- Records,
- O >= StartOffs],
+ % Msgs = [begin
+ % R
+ % end || {O, _} = R <- Records,
+ % O >= StartOffs],
+ Msgs = Records,
% rabbit_log:info("stream2 msgs out ~p", [Msgs]),
% rabbit_log:info("stream entries got entries", [Entries0]),
NumMsgs = length(Msgs),
@@ -106,7 +104,7 @@ stream_entries(Name, LeaderPid,
case Str#stream.credit < 1 of
true ->
%% we are done here
- % rabbit_log:info("stream entries out ~w ~w", [Entries0, Msgs]),
+ % rabbit_log:info("stream entries out ~w ~w", [Msgs, Msgs]),
{Str, MsgIn ++ Msgs};
false ->
%% if there are fewer Msgs than Entries0 it means there were non-events
@@ -199,36 +197,33 @@ handle_offset(#stream2_client{name = Name,
% rabbit_log:info("handle_offset ~w", [_Offs]),
%% offset isn't actually needed as we use the atomic to read the
%% current committed
- Readers = maps:map(
- fun (Tag, Str0) ->
- {Str, Msgs} = stream_entries(Name, Leader, Str0),
- %% HACK for now, better to just return but
- %% tricky with acks credits
- %% that also evaluate the stream
- gen_server:cast(self(), {stream_delivery, Tag, Msgs}),
- Str
- end, Readers0),
- State#stream2_client{readers = Readers}.
+ {Readers, TagMsgs} = maps:fold(
+ fun (Tag, Str0, {Acc, TM}) ->
+ % rabbit_log:info("handle_offset for ~w", [Tag]),
+ {Str, Msgs} = stream_entries(Name, Leader, Str0),
+ %% HACK for now, better to just return but
+ %% tricky with acks credits
+ %% that also evaluate the stream
+ % gen_server:cast(self(), {stream_delivery, Tag, Msgs}),
+ {Acc#{Tag => Str}, [{Tag, Leader, Msgs} | TM]}
+ end, {#{}, []}, Readers0),
+ {State#stream2_client{readers = Readers}, TagMsgs}.
credit(#stream2_client{name = Name,
leader = Leader,
readers = Readers0} = State, Tag, Credit) ->
% rabbit_log:info("stream2 credit ~w ~w", [Credit, Tag]),
- Readers = case Readers0 of
- #{Tag := #stream{credit = Credit0} = Str0} ->
- % rabbit_log:info("stream2 credit yeah ~w ~w", [Credit, Tag]),
- Str1 = Str0#stream{credit = Credit0 + Credit},
- {Str, Msgs} = stream_entries(Name, Leader, Str1),
- %% HACK for now, better to just return but
- %% tricky with acks credits
- %% that also evaluate the stream
- gen_server:cast(self(), {stream_delivery, Tag, Msgs}),
- Readers0#{Tag => Str};
- _ ->
- Readers0
- end,
- {ok, State#stream2_client{readers = Readers}}.
+ {Readers, TagMsgs} = case Readers0 of
+ #{Tag := #stream{credit = Credit0} = Str0} ->
+ % rabbit_log:info("stream2 credit yeah ~w ~w", [Credit, Tag]),
+ Str1 = Str0#stream{credit = Credit0 + Credit},
+ {Str, Msgs0} = stream_entries(Name, Leader, Str1),
+ {Readers0#{Tag => Str}, {Tag, Leader, Msgs0}};
+ _ ->
+ {Readers0, []}
+ end,
+ {ok, TagMsgs, State#stream2_client{readers = Readers}}.
%% MGMT
@@ -240,12 +235,9 @@ declare(Q0) ->
ActingUser = maps:get(user, Opts, ?UNKNOWN_USER),
Replicas = rabbit_mnesia:cluster_nodes(all) -- [node()],
N = ra_lib:derive_safe_string(atom_to_list(Name), 8),
- Dir = filename:join(rabbit_mnesia:dir(), "streams"),
- file:make_dir(Dir),
% rabbit_log:info("Declare stream2 in ~s", [Dir]),
- Conf = #{dir => Dir,
- reference => QName,
- name => N},
+ Conf = #{name => N,
+ reference => QName},
{ok, LeaderPid, ReplicaPids} = osiris:start_cluster(N, Replicas, Conf),
Q1 = amqqueue:set_slave_pids(
amqqueue:set_pid(Q0, LeaderPid), ReplicaPids),