diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2022-08-09 11:14:08 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-08-09 11:14:08 +0200 |
commit | 0e4b68b6095a800865fbcd604b4f0180788e9c8e (patch) | |
tree | f025327bbad11dab7f45181145633ddfb3bca415 | |
parent | c2eb78b1777d4dd131889b84d8f7b064d31f0ed6 (diff) | |
parent | 73d067f4522793f83b00695e7c0dacae7b753a00 (diff) | |
download | rabbitmq-server-git-0e4b68b6095a800865fbcd604b4f0180788e9c8e.tar.gz |
Merge pull request #5466 from rabbitmq/mergify/bp/v3.11.x/pr-5427
Add StreamStats command to stream protocol (backport #5427)
9 files changed, 284 insertions, 115 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc index 2f02a95e9f..1223ce3949 100644 --- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc +++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc @@ -226,6 +226,11 @@ used to make the difference between a request (0) and a response (1). Example fo |0x001b |Yes +|<<streamstats>> +|Client +|0x001c +|Yes + |=== === DeclarePublisher @@ -370,7 +375,7 @@ Deliver => Key Version SubscriptionId CommittedOffset OsirisChunk Key => uint16 // 0x0008 Version => uint16 SubscriptionId => uint8 - CommittedOffset => uint64 + CommittedChunkId => uint64 OsirisChunk => MagicVersion NumEntries NumRecords Epoch ChunkFirstOffset ChunkCrc DataLength Messages MagicVersion => int8 ChunkType => int8 // 0: user, 1: tracking delta, 2: tracking snapshot @@ -702,6 +707,26 @@ CommandVersionsExchangeResponse => Key Version CorrelationId ResponseCode [Comma MaxVersion => uint16 ``` +=== StreamStats + +``` +StreamStatsRequest => Key Version CorrelationId Stream + Key => uint16 // 0x001c + Version => uint16 + CorrelationId => uint32 + Stream => string + +StreamStatsResponse => Key Version CorrelationId ResponseCode Stats + Key => uint16 // 0x801c + Version => uint16 + CorrelationId => uint32 + ResponseCode => uint16 + Stats => [Statistic] + Statistic => Key Value + Key => string + Value => int64 +``` + == Authentication diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index da93d89d94..e13483ceb5 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -33,6 +33,7 @@ delete_super_stream/3, lookup_leader/2, lookup_local_member/2, + lookup_member/2, topology/2, route/3, partitions/2, @@ -100,6 +101,12 @@ lookup_leader(VirtualHost, Stream) -> lookup_local_member(VirtualHost, Stream) -> gen_server:call(?MODULE, {lookup_local_member, VirtualHost, Stream}). +-spec lookup_member(binary(), binary()) -> + {ok, pid()} | {error, not_found} | + {error, not_available}. +lookup_member(VirtualHost, Stream) -> + gen_server:call(?MODULE, {lookup_member, VirtualHost, Stream}). + -spec topology(binary(), binary()) -> {ok, #{leader_node => undefined | pid(), @@ -292,119 +299,109 @@ handle_call({delete_super_stream, VirtualHost, SuperStream, Username}, {reply, {error, Error}, State} end; handle_call({lookup_leader, VirtualHost, Stream}, _From, State) -> - Name = - #resource{virtual_host = VirtualHost, - kind = queue, - name = Stream}, - Res = case rabbit_amqqueue:lookup(Name) of + Res = case lookup_stream(VirtualHost, Stream) of {ok, Q} -> - case is_stream_queue(Q) of + LeaderPid = amqqueue:get_pid(Q), + case process_alive(LeaderPid) of true -> - LeaderPid = amqqueue:get_pid(Q), - case process_alive(LeaderPid) of - true -> - {ok, LeaderPid}; - false -> - case leader_from_members(Q) of - {ok, Pid} -> - {ok, Pid}; - _ -> - {error, not_available} - end - end; - _ -> - {error, not_found} + {ok, LeaderPid}; + false -> + case leader_from_members(Q) of + {ok, Pid} -> + {ok, Pid}; + _ -> + {error, not_available} + end end; - {error, not_found} -> - case rabbit_amqqueue:not_found_or_absent_dirty(Name) of - not_found -> - {error, not_found}; - _ -> - {error, not_available} - end + R -> + R end, {reply, Res, State}; handle_call({lookup_local_member, VirtualHost, Stream}, _From, State) -> - Name = - #resource{virtual_host = VirtualHost, - kind = queue, - name = Stream}, - Res = case rabbit_amqqueue:lookup(Name) of + Res = case lookup_stream(VirtualHost, Stream) of {ok, Q} -> - case is_stream_queue(Q) of - true -> - #{name := StreamName} = amqqueue:get_type_state(Q), - % FIXME check if pid is alive in case of stale information - case rabbit_stream_coordinator:local_pid(StreamName) - of - {ok, Pid} when is_pid(Pid) -> - {ok, Pid}; - {error, timeout} -> - {error, not_available}; - _ -> - {error, not_available} - end; - _ -> - {error, not_found} - end; - {error, not_found} -> - case rabbit_amqqueue:not_found_or_absent_dirty(Name) of - not_found -> - {error, not_found}; + #{name := StreamName} = amqqueue:get_type_state(Q), + % FIXME check if pid is alive in case of stale information + case rabbit_stream_coordinator:local_pid(StreamName) of + {ok, Pid} when is_pid(Pid) -> + {ok, Pid}; + {error, timeout} -> + {error, not_available}; _ -> {error, not_available} - end + end; + R -> + R end, {reply, Res, State}; -handle_call({topology, VirtualHost, Stream}, _From, State) -> - Name = - #resource{virtual_host = VirtualHost, - kind = queue, - name = Stream}, - Res = case rabbit_amqqueue:lookup(Name) of +handle_call({lookup_member, VirtualHost, Stream}, _From, State) -> + Res = case lookup_stream(VirtualHost, Stream) of {ok, Q} -> - case is_stream_queue(Q) of - true -> - QState = amqqueue:get_type_state(Q), - #{name := StreamName} = QState, + #{name := StreamName} = amqqueue:get_type_state(Q), + % FIXME check if pid is alive in case of stale information + case rabbit_stream_coordinator:local_pid(StreamName) of + {ok, Pid} when is_pid(Pid) -> + {ok, Pid}; + _ -> case rabbit_stream_coordinator:members(StreamName) of {ok, Members} -> - {ok, - maps:fold(fun (_Node, {undefined, _Role}, - Acc) -> - Acc; - (LeaderNode, {_Pid, writer}, - Acc) -> - Acc#{leader_node => - LeaderNode}; - (ReplicaNode, {_Pid, replica}, - Acc) -> - #{replica_nodes := - ReplicaNodes} = - Acc, - Acc#{replica_nodes => - ReplicaNodes - ++ [ReplicaNode]}; - (_Node, _, Acc) -> - Acc - end, - #{leader_node => undefined, - replica_nodes => []}, - Members)}; + case lists:search(fun ({undefined, _Role}) -> + false; + ({P, _Role}) + when is_pid(P) -> + process_alive(P); + (_) -> + false + end, + maps:values(Members)) + of + {value, {Pid, _Role}} -> + {ok, Pid}; + _ -> + {error, not_available} + end; _ -> - {error, stream_not_available} - end; + {error, not_available} + end + end; + R -> + R + end, + {reply, Res, State}; +handle_call({topology, VirtualHost, Stream}, _From, State) -> + Res = case lookup_stream(VirtualHost, Stream) of + {ok, Q} -> + QState = amqqueue:get_type_state(Q), + #{name := StreamName} = QState, + case rabbit_stream_coordinator:members(StreamName) of + {ok, Members} -> + {ok, + maps:fold(fun (_Node, {undefined, _Role}, Acc) -> + Acc; + (LeaderNode, {_Pid, writer}, Acc) -> + Acc#{leader_node => LeaderNode}; + (ReplicaNode, {_Pid, replica}, Acc) -> + #{replica_nodes := ReplicaNodes} = + Acc, + Acc#{replica_nodes => + ReplicaNodes + ++ [ReplicaNode]}; + (_Node, _, Acc) -> + Acc + end, + #{leader_node => undefined, + replica_nodes => []}, + Members)}; _ -> - {error, stream_not_found} + {error, not_available} end; {error, not_found} -> - case rabbit_amqqueue:not_found_or_absent_dirty(Name) of - not_found -> - {error, stream_not_found}; - _ -> - {error, stream_not_available} - end + {error, stream_not_found}; + {error, not_available} -> + {error, stream_not_available}; + R -> + R end, {reply, Res, State}; handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From, @@ -837,6 +834,28 @@ delete_super_stream_exchange(VirtualHost, Name, Username) -> {error, validation_failed} end. +lookup_stream(VirtualHost, Stream) -> + Name = + #resource{virtual_host = VirtualHost, + kind = queue, + name = Stream}, + case rabbit_amqqueue:lookup(Name) of + {ok, Q} -> + case is_stream_queue(Q) of + true -> + {ok, Q}; + _ -> + {error, not_found} + end; + {error, not_found} -> + case rabbit_amqqueue:not_found_or_absent_dirty(Name) of + not_found -> + {error, not_found}; + _ -> + {error, not_available} + end + end. + leader_from_members(Q) -> QState = amqqueue:get_type_state(Q), #{name := StreamName} = QState, @@ -860,7 +879,12 @@ process_alive(Pid) -> CurrentNode -> is_process_alive(Pid); OtherNode -> - rpc:call(OtherNode, erlang, is_process_alive, [Pid], 10000) + case rpc:call(OtherNode, erlang, is_process_alive, [Pid], 10000) of + B when is_boolean(B) -> + B; + _ -> + false + end end. is_stream_queue(Q) -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 2db2e24649..bcd6c69618 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -1734,7 +1734,6 @@ handle_frame_post_auth(Transport, State, {request, CorrelationId, {query_publisher_sequence, Reference, Stream}}) -> - % FrameSize = ?RESPONSE_FRAME_SIZE + 8, {ResponseCode, Sequence} = case rabbit_stream_utils:check_read_permitted(#resource{name = Stream, kind = queue, @@ -2642,6 +2641,52 @@ handle_frame_post_auth(Transport, process_client_command_versions(Connection0, CommandVersions), {Connection1, State}; handle_frame_post_auth(Transport, + #stream_connection{socket = S, + virtual_host = VirtualHost, + user = User} = + Connection, + State, + {request, CorrelationId, {stream_stats, Stream}}) -> + QueueResource = + #resource{name = Stream, + kind = queue, + virtual_host = VirtualHost}, + Response = + case rabbit_stream_utils:check_read_permitted(QueueResource, User, + #{}) + of + ok -> + case rabbit_stream_manager:lookup_member(VirtualHost, Stream) of + {error, not_available} -> + rabbit_global_counters:increase_protocol_counter(stream, + ?STREAM_NOT_AVAILABLE, + 1), + {stream_stats, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE, + #{}}; + {error, not_found} -> + rabbit_global_counters:increase_protocol_counter(stream, + ?STREAM_DOES_NOT_EXIST, + 1), + {stream_stats, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, + #{}}; + {ok, MemberPid} -> + StreamStats = + maps:fold(fun(K, V, Acc) -> + Acc#{atom_to_binary(K) => V} + end, + #{}, osiris:get_stats(MemberPid)), + {stream_stats, ?RESPONSE_CODE_OK, StreamStats} + end; + error -> + rabbit_global_counters:increase_protocol_counter(stream, + ?ACCESS_REFUSED, + 1), + {stream_stats, ?RESPONSE_CODE_ACCESS_REFUSED, #{}} + end, + Frame = rabbit_stream_core:frame({response, CorrelationId, Response}), + send(Transport, S, Frame), + {Connection, State}; +handle_frame_post_auth(Transport, #stream_connection{socket = S} = Connection, State, {request, CorrelationId, @@ -3187,7 +3232,7 @@ send_file_callback(?VERSION_2, fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries}, Size) -> FrameSize = 2 + 2 + 1 + 8 + Size, - CommittedOffset = + CommittedChunkId = case osiris_log:committed_offset(Log) of undefined -> 0; R -> R @@ -3198,7 +3243,7 @@ send_file_callback(?VERSION_2, ?COMMAND_DELIVER:15, ?VERSION_2:16, SubscriptionId:8/unsigned, - CommittedOffset:64>>, + CommittedChunkId:64>>, Transport:send(S, FrameBeginning), atomics:add(Counter, 1, Size), increase_messages_consumed(Counters, NumEntries), diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index c9b6e5f48a..55e1c71077 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -275,4 +275,5 @@ command_versions() -> {close, ?VERSION_1, ?VERSION_1}, {heartbeat, ?VERSION_1, ?VERSION_1}, {route, ?VERSION_1, ?VERSION_1}, - {partitions, ?VERSION_1, ?VERSION_1}]. + {partitions, ?VERSION_1, ?VERSION_1}, + {stream_stats, ?VERSION_1, ?VERSION_1}]. diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 1262e767df..f6ca661ac1 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -440,8 +440,10 @@ test_server(Transport, Config) -> C12 = test_deliver_v2(Transport, S, SubscriptionId2, 0, Body, C11), C13 = test_deliver_v2(Transport, S, SubscriptionId2, 1, Body, C12), - C14 = test_delete_stream(Transport, S, Stream, C13), - _C15 = test_close(Transport, S, C14), + C14 = test_stream_stats(Transport, S, Stream, C13), + + C15 = test_delete_stream(Transport, S, Stream, C14), + _C16 = test_close(Transport, S, C15), closed = wait_for_socket_close(Transport, S, 10), ok. @@ -620,6 +622,18 @@ test_exchange_command_versions(Transport, S, C0) -> Cmd), C. +test_stream_stats(Transport, S, Stream, C0) -> + SICmd = {request, 1, {stream_stats, Stream}}, + SIFrame = rabbit_stream_core:frame(SICmd), + ok = Transport:send(S, SIFrame), + {Cmd, C} = receive_commands(Transport, S, C0), + ?assertEqual({response, 1, + {stream_stats, ?RESPONSE_CODE_OK, + #{<<"first_chunk_id">> => 0, + <<"committed_chunk_id">> => 1}}}, + Cmd), + C. + test_close(Transport, S, C0) -> CloseReason = <<"OK">>, CloseFrame = diff --git a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl index 0a5eb2b7d3..ede66b2d52 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl @@ -18,7 +18,10 @@ all() -> groups() -> [{non_parallel_tests, [], - [manage_super_stream, lookup_leader, partition_index]}]. + [manage_super_stream, + lookup_leader, + lookup_member, + partition_index]}]. %% ------------------------------------------------------------------- %% Testsuite setup/teardown. @@ -84,6 +87,17 @@ lookup_leader(Config) -> ?assertEqual({ok, deleted}, delete_stream(Config, Stream)). +lookup_member(Config) -> + Stream = <<"stream_manager_lookup_member_stream">>, + ?assertMatch({ok, _}, create_stream(Config, Stream)), + + {ok, Pid} = lookup_member(Config, Stream), + ?assert(is_pid(Pid)), + + ?assertEqual({error, not_found}, lookup_member(Config, <<"foo">>)), + + ?assertEqual({ok, deleted}, delete_stream(Config, Stream)). + manage_super_stream(Config) -> % create super stream ?assertEqual(ok, @@ -215,6 +229,13 @@ lookup_leader(Config, Name) -> lookup_leader, [<<"/">>, Name]). +lookup_member(Config, Name) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + lookup_member, + [<<"/">>, Name]). + partitions(Config, Name) -> rabbit_ct_broker_helpers:rpc(Config, 0, diff --git a/deps/rabbitmq_stream_common/include/rabbit_stream.hrl b/deps/rabbitmq_stream_common/include/rabbit_stream.hrl index a9c20101ee..5f46cd8bff 100644 --- a/deps/rabbitmq_stream_common/include/rabbit_stream.hrl +++ b/deps/rabbitmq_stream_common/include/rabbit_stream.hrl @@ -25,6 +25,7 @@ -define(COMMAND_PARTITIONS, 25). -define(COMMAND_CONSUMER_UPDATE, 26). -define(COMMAND_EXCHANGE_COMMAND_VERSIONS, 27). +-define(COMMAND_STREAM_STATS, 28). -define(REQUEST, 0). -define(RESPONSE, 1). diff --git a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl index 41f99b2bd5..da0c0c3fba 100644 --- a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl +++ b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl @@ -75,7 +75,7 @@ {deliver, subscription_id(), Chunk :: binary()} | {deliver_v2, subscription_id(), - CommittedOffset :: osiris:offset(), + CommittedChunkId :: osiris:offset(), Chunk :: binary()} | {credit, subscription_id(), Credit :: non_neg_integer()} | {metadata_update, stream_name(), response_code()} | @@ -108,7 +108,8 @@ {consumer_update, subscription_id(), active()} | {exchange_command_versions, [{Command :: atom(), MinVersion :: command_version(), - MaxVersion :: command_version()}]}} | + MaxVersion :: command_version()}]} | + {stream_stats, Stream :: binary()}} | {response, correlation_id(), {declare_publisher | delete_publisher | @@ -138,7 +139,8 @@ {consumer_update, response_code(), none | offset_spec()} | {exchange_command_versions, response_code(), [{Command :: atom(), MinVersion :: command_version(), - MaxVersion :: command_version()}]}} | + MaxVersion :: command_version()}]} | + {stream_stats, response_code(), Stats :: #{binary() => integer()}}} | {unknown, binary()}. -spec init(term()) -> state(). @@ -243,12 +245,12 @@ frame({deliver, SubscriptionId, Chunk}) -> ?VERSION_1:16, SubscriptionId:8>>, Chunk]); -frame({deliver_v2, SubscriptionId, CommittedOffset, Chunk}) -> +frame({deliver_v2, SubscriptionId, CommittedChunkId, Chunk}) -> wrap_in_frame([<<?REQUEST:1, ?COMMAND_DELIVER:15, ?VERSION_2:16, SubscriptionId:8, - CommittedOffset:64>>, + CommittedChunkId:64>>, Chunk]); frame({metadata_update, Stream, ResponseCode}) -> StreamSize = byte_size(Stream), @@ -471,7 +473,18 @@ response_body({exchange_command_versions = Tag, Code, end, [], CommandVersions), {command_id(Tag), - [<<Code:16, (length(CommandVersions)):32>>, CommandVersionsBin]}. + [<<Code:16, (length(CommandVersions)):32>>, CommandVersionsBin]}; +response_body({stream_stats = Tag, Code, Stats}) -> + Init = <<Code:16, (maps:size(Stats)):32>>, + {command_id(Tag), + maps:fold(fun(Key, Value, Acc) -> + KeySize = byte_size(Key), + <<Acc/binary, + KeySize:16, + Key:KeySize/binary, + Value:64/signed>> + end, + Init, Stats)}. request_body({declare_publisher = Tag, PublisherId, @@ -576,7 +589,9 @@ request_body({exchange_command_versions = Tag, CommandVersions}) -> end, [], CommandVersions), CommandVersionsLength = length(CommandVersions), - {Tag, [<<CommandVersionsLength:32>>, CommandVersionsBin]}. + {Tag, [<<CommandVersionsLength:32>>, CommandVersionsBin]}; +request_body({stream_stats = Tag, Stream}) -> + {Tag, <<?STRING(Stream)>>}. append_data(Prev, Data) when is_binary(Prev) -> [Prev, Data]; @@ -622,9 +637,9 @@ parse_request(<<?REQUEST:1, ?COMMAND_DELIVER:15, ?VERSION_2:16, SubscriptionId:8, - CommittedOffset:64, + CommittedChunkId:64, Chunk/binary>>) -> - {deliver_v2, SubscriptionId, CommittedOffset, Chunk}; + {deliver_v2, SubscriptionId, CommittedChunkId, Chunk}; parse_request(<<?REQUEST:1, ?COMMAND_CREDIT:15, ?VERSION_1:16, @@ -843,6 +858,12 @@ parse_request(<<?REQUEST:1, CommandVersionsBin/binary>>) -> CommandVersions = parse_command_versions(CommandVersionsBin), request(CorrelationId, {exchange_command_versions, CommandVersions}); +parse_request(<<?REQUEST:1, + ?COMMAND_STREAM_STATS:15, + ?VERSION_1:16, + CorrelationId:32, + ?STRING(StreamSize, Stream)>>) -> + request(CorrelationId, {stream_stats, Stream}); parse_request(Bin) -> {unknown, Bin}. @@ -928,7 +949,11 @@ parse_response_body(?COMMAND_EXCHANGE_COMMAND_VERSIONS, <<ResponseCode:16, _CommandVersionsCount:32, CommandVersionsBin/binary>>) -> CommandVersions = parse_command_versions(CommandVersionsBin), - {exchange_command_versions, ResponseCode, CommandVersions}. + {exchange_command_versions, ResponseCode, CommandVersions}; +parse_response_body(?COMMAND_STREAM_STATS, + <<ResponseCode:16, _Count:32, StatsBin/binary>>) -> + Info = parse_int_map(StatsBin, #{}), + {stream_stats, ResponseCode, Info}. offset_spec(OffsetType, OffsetValueBin) -> case OffsetType of @@ -1000,6 +1025,11 @@ parse_map(<<?STRING(KeySize, Key), ?STRING(ValSize, Value), Acc) -> parse_map(Rem, Acc#{Key => Value}). +parse_int_map(<<>>, Acc) -> + Acc; +parse_int_map(<<?STRING(KeySize, Key), Value:64, Rem/binary>>, Acc) -> + parse_int_map(Rem, Acc#{Key => Value}). + generate_map(Map) -> maps:fold(fun(K, V, Acc) -> [<<?STRING(K), ?STRING(V)>> | Acc] end, [], Map). @@ -1079,7 +1109,9 @@ command_id(partitions) -> command_id(consumer_update) -> ?COMMAND_CONSUMER_UPDATE; command_id(exchange_command_versions) -> - ?COMMAND_EXCHANGE_COMMAND_VERSIONS. + ?COMMAND_EXCHANGE_COMMAND_VERSIONS; +command_id(stream_stats) -> + ?COMMAND_STREAM_STATS. parse_command_id(?COMMAND_DECLARE_PUBLISHER) -> declare_publisher; @@ -1134,7 +1166,9 @@ parse_command_id(?COMMAND_PARTITIONS) -> parse_command_id(?COMMAND_CONSUMER_UPDATE) -> consumer_update; parse_command_id(?COMMAND_EXCHANGE_COMMAND_VERSIONS) -> - exchange_command_versions. + exchange_command_versions; +parse_command_id(?COMMAND_STREAM_STATS) -> + stream_stats. element_index(Element, List) -> element_index(Element, List, 0). diff --git a/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl b/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl index a37daceda8..8e40b0f8d6 100644 --- a/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl +++ b/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl @@ -108,6 +108,8 @@ roundtrip(_Config) -> test_roundtrip({request, 99, {exchange_command_versions, [{deliver, ?VERSION_1, ?VERSION_1}]}}), + test_roundtrip({request, 99, {stream_stats, <<"stream_name">>}}), + %% RESPONSES [test_roundtrip({response, 99, {Tag, 53}}) || Tag @@ -140,6 +142,8 @@ roundtrip(_Config) -> test_roundtrip({response, 99, {exchange_command_versions, 1, [{publish, ?VERSION_1, ?VERSION_1}]}}), + test_roundtrip({response, 99, + {stream_stats, 1, #{<<"committed_offset">> => 42}}}), ok. roundtrip_metadata(_Config) -> |