summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2022-08-09 11:14:08 +0200
committerGitHub <noreply@github.com>2022-08-09 11:14:08 +0200
commit0e4b68b6095a800865fbcd604b4f0180788e9c8e (patch)
treef025327bbad11dab7f45181145633ddfb3bca415
parentc2eb78b1777d4dd131889b84d8f7b064d31f0ed6 (diff)
parent73d067f4522793f83b00695e7c0dacae7b753a00 (diff)
downloadrabbitmq-server-git-0e4b68b6095a800865fbcd604b4f0180788e9c8e.tar.gz
Merge pull request #5466 from rabbitmq/mergify/bp/v3.11.x/pr-5427
Add StreamStats command to stream protocol (backport #5427)
-rw-r--r--deps/rabbitmq_stream/docs/PROTOCOL.adoc27
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl214
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl51
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_utils.erl3
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl18
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl23
-rw-r--r--deps/rabbitmq_stream_common/include/rabbit_stream.hrl1
-rw-r--r--deps/rabbitmq_stream_common/src/rabbit_stream_core.erl58
-rw-r--r--deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl4
9 files changed, 284 insertions, 115 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
index 2f02a95e9f..1223ce3949 100644
--- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc
+++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
@@ -226,6 +226,11 @@ used to make the difference between a request (0) and a response (1). Example fo
|0x001b
|Yes
+|<<streamstats>>
+|Client
+|0x001c
+|Yes
+
|===
=== DeclarePublisher
@@ -370,7 +375,7 @@ Deliver => Key Version SubscriptionId CommittedOffset OsirisChunk
Key => uint16 // 0x0008
Version => uint16
SubscriptionId => uint8
- CommittedOffset => uint64
+ CommittedChunkId => uint64
OsirisChunk => MagicVersion NumEntries NumRecords Epoch ChunkFirstOffset ChunkCrc DataLength Messages
MagicVersion => int8
ChunkType => int8 // 0: user, 1: tracking delta, 2: tracking snapshot
@@ -702,6 +707,26 @@ CommandVersionsExchangeResponse => Key Version CorrelationId ResponseCode [Comma
MaxVersion => uint16
```
+=== StreamStats
+
+```
+StreamStatsRequest => Key Version CorrelationId Stream
+ Key => uint16 // 0x001c
+ Version => uint16
+ CorrelationId => uint32
+ Stream => string
+
+StreamStatsResponse => Key Version CorrelationId ResponseCode Stats
+ Key => uint16 // 0x801c
+ Version => uint16
+ CorrelationId => uint32
+ ResponseCode => uint16
+ Stats => [Statistic]
+ Statistic => Key Value
+ Key => string
+ Value => int64
+```
+
== Authentication
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index da93d89d94..e13483ceb5 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -33,6 +33,7 @@
delete_super_stream/3,
lookup_leader/2,
lookup_local_member/2,
+ lookup_member/2,
topology/2,
route/3,
partitions/2,
@@ -100,6 +101,12 @@ lookup_leader(VirtualHost, Stream) ->
lookup_local_member(VirtualHost, Stream) ->
gen_server:call(?MODULE, {lookup_local_member, VirtualHost, Stream}).
+-spec lookup_member(binary(), binary()) ->
+ {ok, pid()} | {error, not_found} |
+ {error, not_available}.
+lookup_member(VirtualHost, Stream) ->
+ gen_server:call(?MODULE, {lookup_member, VirtualHost, Stream}).
+
-spec topology(binary(), binary()) ->
{ok,
#{leader_node => undefined | pid(),
@@ -292,119 +299,109 @@ handle_call({delete_super_stream, VirtualHost, SuperStream, Username},
{reply, {error, Error}, State}
end;
handle_call({lookup_leader, VirtualHost, Stream}, _From, State) ->
- Name =
- #resource{virtual_host = VirtualHost,
- kind = queue,
- name = Stream},
- Res = case rabbit_amqqueue:lookup(Name) of
+ Res = case lookup_stream(VirtualHost, Stream) of
{ok, Q} ->
- case is_stream_queue(Q) of
+ LeaderPid = amqqueue:get_pid(Q),
+ case process_alive(LeaderPid) of
true ->
- LeaderPid = amqqueue:get_pid(Q),
- case process_alive(LeaderPid) of
- true ->
- {ok, LeaderPid};
- false ->
- case leader_from_members(Q) of
- {ok, Pid} ->
- {ok, Pid};
- _ ->
- {error, not_available}
- end
- end;
- _ ->
- {error, not_found}
+ {ok, LeaderPid};
+ false ->
+ case leader_from_members(Q) of
+ {ok, Pid} ->
+ {ok, Pid};
+ _ ->
+ {error, not_available}
+ end
end;
- {error, not_found} ->
- case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
- not_found ->
- {error, not_found};
- _ ->
- {error, not_available}
- end
+ R ->
+ R
end,
{reply, Res, State};
handle_call({lookup_local_member, VirtualHost, Stream}, _From,
State) ->
- Name =
- #resource{virtual_host = VirtualHost,
- kind = queue,
- name = Stream},
- Res = case rabbit_amqqueue:lookup(Name) of
+ Res = case lookup_stream(VirtualHost, Stream) of
{ok, Q} ->
- case is_stream_queue(Q) of
- true ->
- #{name := StreamName} = amqqueue:get_type_state(Q),
- % FIXME check if pid is alive in case of stale information
- case rabbit_stream_coordinator:local_pid(StreamName)
- of
- {ok, Pid} when is_pid(Pid) ->
- {ok, Pid};
- {error, timeout} ->
- {error, not_available};
- _ ->
- {error, not_available}
- end;
- _ ->
- {error, not_found}
- end;
- {error, not_found} ->
- case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
- not_found ->
- {error, not_found};
+ #{name := StreamName} = amqqueue:get_type_state(Q),
+ % FIXME check if pid is alive in case of stale information
+ case rabbit_stream_coordinator:local_pid(StreamName) of
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid};
+ {error, timeout} ->
+ {error, not_available};
_ ->
{error, not_available}
- end
+ end;
+ R ->
+ R
end,
{reply, Res, State};
-handle_call({topology, VirtualHost, Stream}, _From, State) ->
- Name =
- #resource{virtual_host = VirtualHost,
- kind = queue,
- name = Stream},
- Res = case rabbit_amqqueue:lookup(Name) of
+handle_call({lookup_member, VirtualHost, Stream}, _From, State) ->
+ Res = case lookup_stream(VirtualHost, Stream) of
{ok, Q} ->
- case is_stream_queue(Q) of
- true ->
- QState = amqqueue:get_type_state(Q),
- #{name := StreamName} = QState,
+ #{name := StreamName} = amqqueue:get_type_state(Q),
+ % FIXME check if pid is alive in case of stale information
+ case rabbit_stream_coordinator:local_pid(StreamName) of
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid};
+ _ ->
case rabbit_stream_coordinator:members(StreamName) of
{ok, Members} ->
- {ok,
- maps:fold(fun (_Node, {undefined, _Role},
- Acc) ->
- Acc;
- (LeaderNode, {_Pid, writer},
- Acc) ->
- Acc#{leader_node =>
- LeaderNode};
- (ReplicaNode, {_Pid, replica},
- Acc) ->
- #{replica_nodes :=
- ReplicaNodes} =
- Acc,
- Acc#{replica_nodes =>
- ReplicaNodes
- ++ [ReplicaNode]};
- (_Node, _, Acc) ->
- Acc
- end,
- #{leader_node => undefined,
- replica_nodes => []},
- Members)};
+ case lists:search(fun ({undefined, _Role}) ->
+ false;
+ ({P, _Role})
+ when is_pid(P) ->
+ process_alive(P);
+ (_) ->
+ false
+ end,
+ maps:values(Members))
+ of
+ {value, {Pid, _Role}} ->
+ {ok, Pid};
+ _ ->
+ {error, not_available}
+ end;
_ ->
- {error, stream_not_available}
- end;
+ {error, not_available}
+ end
+ end;
+ R ->
+ R
+ end,
+ {reply, Res, State};
+handle_call({topology, VirtualHost, Stream}, _From, State) ->
+ Res = case lookup_stream(VirtualHost, Stream) of
+ {ok, Q} ->
+ QState = amqqueue:get_type_state(Q),
+ #{name := StreamName} = QState,
+ case rabbit_stream_coordinator:members(StreamName) of
+ {ok, Members} ->
+ {ok,
+ maps:fold(fun (_Node, {undefined, _Role}, Acc) ->
+ Acc;
+ (LeaderNode, {_Pid, writer}, Acc) ->
+ Acc#{leader_node => LeaderNode};
+ (ReplicaNode, {_Pid, replica}, Acc) ->
+ #{replica_nodes := ReplicaNodes} =
+ Acc,
+ Acc#{replica_nodes =>
+ ReplicaNodes
+ ++ [ReplicaNode]};
+ (_Node, _, Acc) ->
+ Acc
+ end,
+ #{leader_node => undefined,
+ replica_nodes => []},
+ Members)};
_ ->
- {error, stream_not_found}
+ {error, not_available}
end;
{error, not_found} ->
- case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
- not_found ->
- {error, stream_not_found};
- _ ->
- {error, stream_not_available}
- end
+ {error, stream_not_found};
+ {error, not_available} ->
+ {error, stream_not_available};
+ R ->
+ R
end,
{reply, Res, State};
handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From,
@@ -837,6 +834,28 @@ delete_super_stream_exchange(VirtualHost, Name, Username) ->
{error, validation_failed}
end.
+lookup_stream(VirtualHost, Stream) ->
+ Name =
+ #resource{virtual_host = VirtualHost,
+ kind = queue,
+ name = Stream},
+ case rabbit_amqqueue:lookup(Name) of
+ {ok, Q} ->
+ case is_stream_queue(Q) of
+ true ->
+ {ok, Q};
+ _ ->
+ {error, not_found}
+ end;
+ {error, not_found} ->
+ case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
+ not_found ->
+ {error, not_found};
+ _ ->
+ {error, not_available}
+ end
+ end.
+
leader_from_members(Q) ->
QState = amqqueue:get_type_state(Q),
#{name := StreamName} = QState,
@@ -860,7 +879,12 @@ process_alive(Pid) ->
CurrentNode ->
is_process_alive(Pid);
OtherNode ->
- rpc:call(OtherNode, erlang, is_process_alive, [Pid], 10000)
+ case rpc:call(OtherNode, erlang, is_process_alive, [Pid], 10000) of
+ B when is_boolean(B) ->
+ B;
+ _ ->
+ false
+ end
end.
is_stream_queue(Q) ->
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 2db2e24649..bcd6c69618 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -1734,7 +1734,6 @@ handle_frame_post_auth(Transport,
State,
{request, CorrelationId,
{query_publisher_sequence, Reference, Stream}}) ->
- % FrameSize = ?RESPONSE_FRAME_SIZE + 8,
{ResponseCode, Sequence} =
case rabbit_stream_utils:check_read_permitted(#resource{name = Stream,
kind = queue,
@@ -2642,6 +2641,52 @@ handle_frame_post_auth(Transport,
process_client_command_versions(Connection0, CommandVersions),
{Connection1, State};
handle_frame_post_auth(Transport,
+ #stream_connection{socket = S,
+ virtual_host = VirtualHost,
+ user = User} =
+ Connection,
+ State,
+ {request, CorrelationId, {stream_stats, Stream}}) ->
+ QueueResource =
+ #resource{name = Stream,
+ kind = queue,
+ virtual_host = VirtualHost},
+ Response =
+ case rabbit_stream_utils:check_read_permitted(QueueResource, User,
+ #{})
+ of
+ ok ->
+ case rabbit_stream_manager:lookup_member(VirtualHost, Stream) of
+ {error, not_available} ->
+ rabbit_global_counters:increase_protocol_counter(stream,
+ ?STREAM_NOT_AVAILABLE,
+ 1),
+ {stream_stats, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE,
+ #{}};
+ {error, not_found} ->
+ rabbit_global_counters:increase_protocol_counter(stream,
+ ?STREAM_DOES_NOT_EXIST,
+ 1),
+ {stream_stats, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST,
+ #{}};
+ {ok, MemberPid} ->
+ StreamStats =
+ maps:fold(fun(K, V, Acc) ->
+ Acc#{atom_to_binary(K) => V}
+ end,
+ #{}, osiris:get_stats(MemberPid)),
+ {stream_stats, ?RESPONSE_CODE_OK, StreamStats}
+ end;
+ error ->
+ rabbit_global_counters:increase_protocol_counter(stream,
+ ?ACCESS_REFUSED,
+ 1),
+ {stream_stats, ?RESPONSE_CODE_ACCESS_REFUSED, #{}}
+ end,
+ Frame = rabbit_stream_core:frame({response, CorrelationId, Response}),
+ send(Transport, S, Frame),
+ {Connection, State};
+handle_frame_post_auth(Transport,
#stream_connection{socket = S} = Connection,
State,
{request, CorrelationId,
@@ -3187,7 +3232,7 @@ send_file_callback(?VERSION_2,
fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries},
Size) ->
FrameSize = 2 + 2 + 1 + 8 + Size,
- CommittedOffset =
+ CommittedChunkId =
case osiris_log:committed_offset(Log) of
undefined -> 0;
R -> R
@@ -3198,7 +3243,7 @@ send_file_callback(?VERSION_2,
?COMMAND_DELIVER:15,
?VERSION_2:16,
SubscriptionId:8/unsigned,
- CommittedOffset:64>>,
+ CommittedChunkId:64>>,
Transport:send(S, FrameBeginning),
atomics:add(Counter, 1, Size),
increase_messages_consumed(Counters, NumEntries),
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
index c9b6e5f48a..55e1c71077 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
@@ -275,4 +275,5 @@ command_versions() ->
{close, ?VERSION_1, ?VERSION_1},
{heartbeat, ?VERSION_1, ?VERSION_1},
{route, ?VERSION_1, ?VERSION_1},
- {partitions, ?VERSION_1, ?VERSION_1}].
+ {partitions, ?VERSION_1, ?VERSION_1},
+ {stream_stats, ?VERSION_1, ?VERSION_1}].
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
index 1262e767df..f6ca661ac1 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
@@ -440,8 +440,10 @@ test_server(Transport, Config) ->
C12 = test_deliver_v2(Transport, S, SubscriptionId2, 0, Body, C11),
C13 = test_deliver_v2(Transport, S, SubscriptionId2, 1, Body, C12),
- C14 = test_delete_stream(Transport, S, Stream, C13),
- _C15 = test_close(Transport, S, C14),
+ C14 = test_stream_stats(Transport, S, Stream, C13),
+
+ C15 = test_delete_stream(Transport, S, Stream, C14),
+ _C16 = test_close(Transport, S, C15),
closed = wait_for_socket_close(Transport, S, 10),
ok.
@@ -620,6 +622,18 @@ test_exchange_command_versions(Transport, S, C0) ->
Cmd),
C.
+test_stream_stats(Transport, S, Stream, C0) ->
+ SICmd = {request, 1, {stream_stats, Stream}},
+ SIFrame = rabbit_stream_core:frame(SICmd),
+ ok = Transport:send(S, SIFrame),
+ {Cmd, C} = receive_commands(Transport, S, C0),
+ ?assertEqual({response, 1,
+ {stream_stats, ?RESPONSE_CODE_OK,
+ #{<<"first_chunk_id">> => 0,
+ <<"committed_chunk_id">> => 1}}},
+ Cmd),
+ C.
+
test_close(Transport, S, C0) ->
CloseReason = <<"OK">>,
CloseFrame =
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl
index 0a5eb2b7d3..ede66b2d52 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl
+++ b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl
@@ -18,7 +18,10 @@ all() ->
groups() ->
[{non_parallel_tests, [],
- [manage_super_stream, lookup_leader, partition_index]}].
+ [manage_super_stream,
+ lookup_leader,
+ lookup_member,
+ partition_index]}].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
@@ -84,6 +87,17 @@ lookup_leader(Config) ->
?assertEqual({ok, deleted}, delete_stream(Config, Stream)).
+lookup_member(Config) ->
+ Stream = <<"stream_manager_lookup_member_stream">>,
+ ?assertMatch({ok, _}, create_stream(Config, Stream)),
+
+ {ok, Pid} = lookup_member(Config, Stream),
+ ?assert(is_pid(Pid)),
+
+ ?assertEqual({error, not_found}, lookup_member(Config, <<"foo">>)),
+
+ ?assertEqual({ok, deleted}, delete_stream(Config, Stream)).
+
manage_super_stream(Config) ->
% create super stream
?assertEqual(ok,
@@ -215,6 +229,13 @@ lookup_leader(Config, Name) ->
lookup_leader,
[<<"/">>, Name]).
+lookup_member(Config, Name) ->
+ rabbit_ct_broker_helpers:rpc(Config,
+ 0,
+ rabbit_stream_manager,
+ lookup_member,
+ [<<"/">>, Name]).
+
partitions(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
diff --git a/deps/rabbitmq_stream_common/include/rabbit_stream.hrl b/deps/rabbitmq_stream_common/include/rabbit_stream.hrl
index a9c20101ee..5f46cd8bff 100644
--- a/deps/rabbitmq_stream_common/include/rabbit_stream.hrl
+++ b/deps/rabbitmq_stream_common/include/rabbit_stream.hrl
@@ -25,6 +25,7 @@
-define(COMMAND_PARTITIONS, 25).
-define(COMMAND_CONSUMER_UPDATE, 26).
-define(COMMAND_EXCHANGE_COMMAND_VERSIONS, 27).
+-define(COMMAND_STREAM_STATS, 28).
-define(REQUEST, 0).
-define(RESPONSE, 1).
diff --git a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl
index 41f99b2bd5..da0c0c3fba 100644
--- a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl
+++ b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl
@@ -75,7 +75,7 @@
{deliver, subscription_id(), Chunk :: binary()} |
{deliver_v2,
subscription_id(),
- CommittedOffset :: osiris:offset(),
+ CommittedChunkId :: osiris:offset(),
Chunk :: binary()} |
{credit, subscription_id(), Credit :: non_neg_integer()} |
{metadata_update, stream_name(), response_code()} |
@@ -108,7 +108,8 @@
{consumer_update, subscription_id(), active()} |
{exchange_command_versions,
[{Command :: atom(), MinVersion :: command_version(),
- MaxVersion :: command_version()}]}} |
+ MaxVersion :: command_version()}]} |
+ {stream_stats, Stream :: binary()}} |
{response, correlation_id(),
{declare_publisher |
delete_publisher |
@@ -138,7 +139,8 @@
{consumer_update, response_code(), none | offset_spec()} |
{exchange_command_versions, response_code(),
[{Command :: atom(), MinVersion :: command_version(),
- MaxVersion :: command_version()}]}} |
+ MaxVersion :: command_version()}]} |
+ {stream_stats, response_code(), Stats :: #{binary() => integer()}}} |
{unknown, binary()}.
-spec init(term()) -> state().
@@ -243,12 +245,12 @@ frame({deliver, SubscriptionId, Chunk}) ->
?VERSION_1:16,
SubscriptionId:8>>,
Chunk]);
-frame({deliver_v2, SubscriptionId, CommittedOffset, Chunk}) ->
+frame({deliver_v2, SubscriptionId, CommittedChunkId, Chunk}) ->
wrap_in_frame([<<?REQUEST:1,
?COMMAND_DELIVER:15,
?VERSION_2:16,
SubscriptionId:8,
- CommittedOffset:64>>,
+ CommittedChunkId:64>>,
Chunk]);
frame({metadata_update, Stream, ResponseCode}) ->
StreamSize = byte_size(Stream),
@@ -471,7 +473,18 @@ response_body({exchange_command_versions = Tag, Code,
end,
[], CommandVersions),
{command_id(Tag),
- [<<Code:16, (length(CommandVersions)):32>>, CommandVersionsBin]}.
+ [<<Code:16, (length(CommandVersions)):32>>, CommandVersionsBin]};
+response_body({stream_stats = Tag, Code, Stats}) ->
+ Init = <<Code:16, (maps:size(Stats)):32>>,
+ {command_id(Tag),
+ maps:fold(fun(Key, Value, Acc) ->
+ KeySize = byte_size(Key),
+ <<Acc/binary,
+ KeySize:16,
+ Key:KeySize/binary,
+ Value:64/signed>>
+ end,
+ Init, Stats)}.
request_body({declare_publisher = Tag,
PublisherId,
@@ -576,7 +589,9 @@ request_body({exchange_command_versions = Tag, CommandVersions}) ->
end,
[], CommandVersions),
CommandVersionsLength = length(CommandVersions),
- {Tag, [<<CommandVersionsLength:32>>, CommandVersionsBin]}.
+ {Tag, [<<CommandVersionsLength:32>>, CommandVersionsBin]};
+request_body({stream_stats = Tag, Stream}) ->
+ {Tag, <<?STRING(Stream)>>}.
append_data(Prev, Data) when is_binary(Prev) ->
[Prev, Data];
@@ -622,9 +637,9 @@ parse_request(<<?REQUEST:1,
?COMMAND_DELIVER:15,
?VERSION_2:16,
SubscriptionId:8,
- CommittedOffset:64,
+ CommittedChunkId:64,
Chunk/binary>>) ->
- {deliver_v2, SubscriptionId, CommittedOffset, Chunk};
+ {deliver_v2, SubscriptionId, CommittedChunkId, Chunk};
parse_request(<<?REQUEST:1,
?COMMAND_CREDIT:15,
?VERSION_1:16,
@@ -843,6 +858,12 @@ parse_request(<<?REQUEST:1,
CommandVersionsBin/binary>>) ->
CommandVersions = parse_command_versions(CommandVersionsBin),
request(CorrelationId, {exchange_command_versions, CommandVersions});
+parse_request(<<?REQUEST:1,
+ ?COMMAND_STREAM_STATS:15,
+ ?VERSION_1:16,
+ CorrelationId:32,
+ ?STRING(StreamSize, Stream)>>) ->
+ request(CorrelationId, {stream_stats, Stream});
parse_request(Bin) ->
{unknown, Bin}.
@@ -928,7 +949,11 @@ parse_response_body(?COMMAND_EXCHANGE_COMMAND_VERSIONS,
<<ResponseCode:16, _CommandVersionsCount:32,
CommandVersionsBin/binary>>) ->
CommandVersions = parse_command_versions(CommandVersionsBin),
- {exchange_command_versions, ResponseCode, CommandVersions}.
+ {exchange_command_versions, ResponseCode, CommandVersions};
+parse_response_body(?COMMAND_STREAM_STATS,
+ <<ResponseCode:16, _Count:32, StatsBin/binary>>) ->
+ Info = parse_int_map(StatsBin, #{}),
+ {stream_stats, ResponseCode, Info}.
offset_spec(OffsetType, OffsetValueBin) ->
case OffsetType of
@@ -1000,6 +1025,11 @@ parse_map(<<?STRING(KeySize, Key), ?STRING(ValSize, Value),
Acc) ->
parse_map(Rem, Acc#{Key => Value}).
+parse_int_map(<<>>, Acc) ->
+ Acc;
+parse_int_map(<<?STRING(KeySize, Key), Value:64, Rem/binary>>, Acc) ->
+ parse_int_map(Rem, Acc#{Key => Value}).
+
generate_map(Map) ->
maps:fold(fun(K, V, Acc) -> [<<?STRING(K), ?STRING(V)>> | Acc] end,
[], Map).
@@ -1079,7 +1109,9 @@ command_id(partitions) ->
command_id(consumer_update) ->
?COMMAND_CONSUMER_UPDATE;
command_id(exchange_command_versions) ->
- ?COMMAND_EXCHANGE_COMMAND_VERSIONS.
+ ?COMMAND_EXCHANGE_COMMAND_VERSIONS;
+command_id(stream_stats) ->
+ ?COMMAND_STREAM_STATS.
parse_command_id(?COMMAND_DECLARE_PUBLISHER) ->
declare_publisher;
@@ -1134,7 +1166,9 @@ parse_command_id(?COMMAND_PARTITIONS) ->
parse_command_id(?COMMAND_CONSUMER_UPDATE) ->
consumer_update;
parse_command_id(?COMMAND_EXCHANGE_COMMAND_VERSIONS) ->
- exchange_command_versions.
+ exchange_command_versions;
+parse_command_id(?COMMAND_STREAM_STATS) ->
+ stream_stats.
element_index(Element, List) ->
element_index(Element, List, 0).
diff --git a/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl b/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl
index a37daceda8..8e40b0f8d6 100644
--- a/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl
+++ b/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl
@@ -108,6 +108,8 @@ roundtrip(_Config) ->
test_roundtrip({request, 99,
{exchange_command_versions,
[{deliver, ?VERSION_1, ?VERSION_1}]}}),
+ test_roundtrip({request, 99, {stream_stats, <<"stream_name">>}}),
+
%% RESPONSES
[test_roundtrip({response, 99, {Tag, 53}})
|| Tag
@@ -140,6 +142,8 @@ roundtrip(_Config) ->
test_roundtrip({response, 99,
{exchange_command_versions, 1,
[{publish, ?VERSION_1, ?VERSION_1}]}}),
+ test_roundtrip({response, 99,
+ {stream_stats, 1, #{<<"committed_offset">> => 42}}}),
ok.
roundtrip_metadata(_Config) ->