summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2022-08-03 14:38:45 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2022-08-03 14:38:45 +0200
commit8687e73c7e7e3f89e267749d39077bdd4ed8eef5 (patch)
treebf9a08778a6bdb56fd74bd36f2c44501b43b073b
parent589ed430e280bd09efcbb5ab634a7a202bdcbb19 (diff)
downloadrabbitmq-server-git-8687e73c7e7e3f89e267749d39077bdd4ed8eef5.tar.gz
Add StreamInfo command to stream protocol
It returns general information on a stream, the first and committed offsets for now. Fixes #5412
-rw-r--r--deps/rabbitmq_stream/docs/PROTOCOL.adoc25
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl207
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl58
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_utils.erl3
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl18
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl23
-rw-r--r--deps/rabbitmq_stream_common/include/rabbit_stream.hrl1
-rw-r--r--deps/rabbitmq_stream_common/src/rabbit_stream_core.erl49
-rw-r--r--deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl4
9 files changed, 278 insertions, 110 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
index 2f02a95e9f..bae72dd6d8 100644
--- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc
+++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc
@@ -226,6 +226,11 @@ used to make the difference between a request (0) and a response (1). Example fo
|0x001b
|Yes
+|<<streaminfo>>
+|Client
+|0x001c
+|Yes
+
|===
=== DeclarePublisher
@@ -702,6 +707,26 @@ CommandVersionsExchangeResponse => Key Version CorrelationId ResponseCode [Comma
MaxVersion => uint16
```
+=== StreamInfo
+
+```
+StreamInfoRequest => Key Version CorrelationId Stream
+ Key => uint16 // 0x001c
+ Version => uint16
+ CorrelationId => uint32
+ Stream => string
+
+StreamInfoResponse => Key Version CorrelationId ResponseCode Info
+ Key => uint16 // 0x801c
+ Version => uint16
+ CorrelationId => uint32
+ ResponseCode => uint16
+ Info => [PieceOfInfo]
+ PieceOfInfo => Key Value
+ Key => string
+ Value => string
+```
+
== Authentication
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index da93d89d94..15db2a05a2 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -33,6 +33,7 @@
delete_super_stream/3,
lookup_leader/2,
lookup_local_member/2,
+ lookup_member/2,
topology/2,
route/3,
partitions/2,
@@ -100,11 +101,17 @@ lookup_leader(VirtualHost, Stream) ->
lookup_local_member(VirtualHost, Stream) ->
gen_server:call(?MODULE, {lookup_local_member, VirtualHost, Stream}).
+-spec lookup_member(binary(), binary()) ->
+ {ok, pid()} | {error, not_found} |
+ {error, not_available}.
+lookup_member(VirtualHost, Stream) ->
+ gen_server:call(?MODULE, {lookup_member, VirtualHost, Stream}).
+
-spec topology(binary(), binary()) ->
{ok,
#{leader_node => undefined | pid(),
replica_nodes => [pid()]}} |
- {error, stream_not_found} | {error, stream_not_available}.
+ {error, not_found} | {error, not_available}.
topology(VirtualHost, Stream) ->
gen_server:call(?MODULE, {topology, VirtualHost, Stream}).
@@ -292,119 +299,105 @@ handle_call({delete_super_stream, VirtualHost, SuperStream, Username},
{reply, {error, Error}, State}
end;
handle_call({lookup_leader, VirtualHost, Stream}, _From, State) ->
- Name =
- #resource{virtual_host = VirtualHost,
- kind = queue,
- name = Stream},
- Res = case rabbit_amqqueue:lookup(Name) of
+ Res = case lookup_stream(VirtualHost, Stream) of
{ok, Q} ->
- case is_stream_queue(Q) of
+ LeaderPid = amqqueue:get_pid(Q),
+ case process_alive(LeaderPid) of
true ->
- LeaderPid = amqqueue:get_pid(Q),
- case process_alive(LeaderPid) of
- true ->
- {ok, LeaderPid};
- false ->
- case leader_from_members(Q) of
- {ok, Pid} ->
- {ok, Pid};
- _ ->
- {error, not_available}
- end
- end;
- _ ->
- {error, not_found}
+ {ok, LeaderPid};
+ false ->
+ case leader_from_members(Q) of
+ {ok, Pid} ->
+ {ok, Pid};
+ _ ->
+ {error, not_available}
+ end
end;
- {error, not_found} ->
- case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
- not_found ->
- {error, not_found};
- _ ->
- {error, not_available}
- end
+ R ->
+ R
end,
{reply, Res, State};
handle_call({lookup_local_member, VirtualHost, Stream}, _From,
State) ->
- Name =
- #resource{virtual_host = VirtualHost,
- kind = queue,
- name = Stream},
- Res = case rabbit_amqqueue:lookup(Name) of
+ Res = case lookup_stream(VirtualHost, Stream) of
{ok, Q} ->
- case is_stream_queue(Q) of
- true ->
- #{name := StreamName} = amqqueue:get_type_state(Q),
- % FIXME check if pid is alive in case of stale information
- case rabbit_stream_coordinator:local_pid(StreamName)
- of
- {ok, Pid} when is_pid(Pid) ->
- {ok, Pid};
- {error, timeout} ->
- {error, not_available};
- _ ->
- {error, not_available}
- end;
- _ ->
- {error, not_found}
- end;
- {error, not_found} ->
- case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
- not_found ->
- {error, not_found};
+ #{name := StreamName} = amqqueue:get_type_state(Q),
+ % FIXME check if pid is alive in case of stale information
+ case rabbit_stream_coordinator:local_pid(StreamName) of
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid};
+ {error, timeout} ->
+ {error, not_available};
_ ->
{error, not_available}
- end
+ end;
+ R ->
+ R
end,
{reply, Res, State};
-handle_call({topology, VirtualHost, Stream}, _From, State) ->
- Name =
- #resource{virtual_host = VirtualHost,
- kind = queue,
- name = Stream},
- Res = case rabbit_amqqueue:lookup(Name) of
+handle_call({lookup_member, VirtualHost, Stream}, _From, State) ->
+ Res = case lookup_stream(VirtualHost, Stream) of
{ok, Q} ->
- case is_stream_queue(Q) of
- true ->
- QState = amqqueue:get_type_state(Q),
- #{name := StreamName} = QState,
+ #{name := StreamName} = amqqueue:get_type_state(Q),
+ % FIXME check if pid is alive in case of stale information
+ case rabbit_stream_coordinator:local_pid(StreamName) of
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid};
+ _ ->
case rabbit_stream_coordinator:members(StreamName) of
{ok, Members} ->
- {ok,
- maps:fold(fun (_Node, {undefined, _Role},
- Acc) ->
- Acc;
- (LeaderNode, {_Pid, writer},
- Acc) ->
- Acc#{leader_node =>
- LeaderNode};
- (ReplicaNode, {_Pid, replica},
- Acc) ->
- #{replica_nodes :=
- ReplicaNodes} =
- Acc,
- Acc#{replica_nodes =>
- ReplicaNodes
- ++ [ReplicaNode]};
- (_Node, _, Acc) ->
- Acc
- end,
- #{leader_node => undefined,
- replica_nodes => []},
- Members)};
+ case lists:search(fun ({undefined, _Role}) ->
+ false;
+ ({P, _Role})
+ when is_pid(P) ->
+ is_process_alive(P);
+ (_) ->
+ false
+ end,
+ maps:values(Members))
+ of
+ {value, {Pid, _Role}} ->
+ {ok, Pid};
+ _ ->
+ {error, not_available}
+ end;
_ ->
- {error, stream_not_available}
- end;
- _ ->
- {error, stream_not_found}
+ {error, not_available}
+ end
end;
- {error, not_found} ->
- case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
- not_found ->
- {error, stream_not_found};
+ R ->
+ R
+ end,
+ {reply, Res, State};
+handle_call({topology, VirtualHost, Stream}, _From, State) ->
+ Res = case lookup_stream(VirtualHost, Stream) of
+ {ok, Q} ->
+ QState = amqqueue:get_type_state(Q),
+ #{name := StreamName} = QState,
+ case rabbit_stream_coordinator:members(StreamName) of
+ {ok, Members} ->
+ {ok,
+ maps:fold(fun (_Node, {undefined, _Role}, Acc) ->
+ Acc;
+ (LeaderNode, {_Pid, writer}, Acc) ->
+ Acc#{leader_node => LeaderNode};
+ (ReplicaNode, {_Pid, replica}, Acc) ->
+ #{replica_nodes := ReplicaNodes} =
+ Acc,
+ Acc#{replica_nodes =>
+ ReplicaNodes
+ ++ [ReplicaNode]};
+ (_Node, _, Acc) ->
+ Acc
+ end,
+ #{leader_node => undefined,
+ replica_nodes => []},
+ Members)};
_ ->
- {error, stream_not_available}
- end
+ {error, not_available}
+ end;
+ R ->
+ R
end,
{reply, Res, State};
handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From,
@@ -837,6 +830,28 @@ delete_super_stream_exchange(VirtualHost, Name, Username) ->
{error, validation_failed}
end.
+lookup_stream(VirtualHost, Stream) ->
+ Name =
+ #resource{virtual_host = VirtualHost,
+ kind = queue,
+ name = Stream},
+ case rabbit_amqqueue:lookup(Name) of
+ {ok, Q} ->
+ case is_stream_queue(Q) of
+ true ->
+ {ok, Q};
+ _ ->
+ {error, not_found}
+ end;
+ {error, not_found} ->
+ case rabbit_amqqueue:not_found_or_absent_dirty(Name) of
+ not_found ->
+ {error, not_found};
+ _ ->
+ {error, not_available}
+ end
+ end.
+
leader_from_members(Q) ->
QState = amqqueue:get_type_state(Q),
#{name := StreamName} = QState,
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 2db2e24649..81265729c9 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -1734,7 +1734,6 @@ handle_frame_post_auth(Transport,
State,
{request, CorrelationId,
{query_publisher_sequence, Reference, Stream}}) ->
- % FrameSize = ?RESPONSE_FRAME_SIZE + 8,
{ResponseCode, Sequence} =
case rabbit_stream_utils:check_read_permitted(#resource{name = Stream,
kind = queue,
@@ -2642,6 +2641,63 @@ handle_frame_post_auth(Transport,
process_client_command_versions(Connection0, CommandVersions),
{Connection1, State};
handle_frame_post_auth(Transport,
+ #stream_connection{socket = S,
+ virtual_host = VirtualHost,
+ user = User} =
+ Connection,
+ State,
+ {request, CorrelationId, {stream_info, Stream}}) ->
+ QueueResource =
+ #resource{name = Stream,
+ kind = queue,
+ virtual_host = VirtualHost},
+ Response =
+ case rabbit_stream_utils:check_read_permitted(QueueResource, User,
+ #{})
+ of
+ ok ->
+ case rabbit_stream_manager:lookup_member(VirtualHost, Stream) of
+ {error, not_available} ->
+ rabbit_global_counters:increase_protocol_counter(stream,
+ ?STREAM_NOT_AVAILABLE,
+ 1),
+ {stream_info, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE, #{}};
+ {error, not_found} ->
+ rabbit_global_counters:increase_protocol_counter(stream,
+ ?STREAM_DOES_NOT_EXIST,
+ 1),
+ {stream_info, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST,
+ #{}};
+ {ok, MemberPid} ->
+ OffsetInfo =
+ case gen:call(MemberPid, '$gen_call',
+ get_reader_context)
+ of
+ {ok, #{offset_ref := undefined}} ->
+ #{};
+ {ok, #{offset_ref := OffsetRef}} ->
+ #{<<"first_offset">> =>
+ rabbit_data_coercion:to_binary(
+ atomics:get(OffsetRef, 2)),
+ <<"committed_offset">> =>
+ rabbit_data_coercion:to_binary(
+ atomics:get(OffsetRef, 1))};
+ _ ->
+ #{}
+ end,
+
+ {stream_info, ?RESPONSE_CODE_OK, OffsetInfo}
+ end;
+ error ->
+ rabbit_global_counters:increase_protocol_counter(stream,
+ ?ACCESS_REFUSED,
+ 1),
+ {stream_info, ?RESPONSE_CODE_ACCESS_REFUSED, #{}}
+ end,
+ Frame = rabbit_stream_core:frame({response, CorrelationId, Response}),
+ send(Transport, S, Frame),
+ {Connection, State};
+handle_frame_post_auth(Transport,
#stream_connection{socket = S} = Connection,
State,
{request, CorrelationId,
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
index c9b6e5f48a..4f4499036d 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
@@ -275,4 +275,5 @@ command_versions() ->
{close, ?VERSION_1, ?VERSION_1},
{heartbeat, ?VERSION_1, ?VERSION_1},
{route, ?VERSION_1, ?VERSION_1},
- {partitions, ?VERSION_1, ?VERSION_1}].
+ {partitions, ?VERSION_1, ?VERSION_1},
+ {stream_info, ?VERSION_1, ?VERSION_1}].
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
index 1262e767df..96782b3160 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
+++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
@@ -440,8 +440,10 @@ test_server(Transport, Config) ->
C12 = test_deliver_v2(Transport, S, SubscriptionId2, 0, Body, C11),
C13 = test_deliver_v2(Transport, S, SubscriptionId2, 1, Body, C12),
- C14 = test_delete_stream(Transport, S, Stream, C13),
- _C15 = test_close(Transport, S, C14),
+ C14 = test_stream_info(Transport, S, Stream, C13),
+
+ C15 = test_delete_stream(Transport, S, Stream, C14),
+ _C16 = test_close(Transport, S, C15),
closed = wait_for_socket_close(Transport, S, 10),
ok.
@@ -620,6 +622,18 @@ test_exchange_command_versions(Transport, S, C0) ->
Cmd),
C.
+test_stream_info(Transport, S, Stream, C0) ->
+ SICmd = {request, 1, {stream_info, Stream}},
+ SIFrame = rabbit_stream_core:frame(SICmd),
+ ok = Transport:send(S, SIFrame),
+ {Cmd, C} = receive_commands(Transport, S, C0),
+ ?assertEqual({response, 1,
+ {stream_info, ?RESPONSE_CODE_OK,
+ #{<<"committed_offset">> => <<"1">>,
+ <<"first_offset">> => <<"0">>}}},
+ Cmd),
+ C.
+
test_close(Transport, S, C0) ->
CloseReason = <<"OK">>,
CloseFrame =
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl
index 0a5eb2b7d3..ede66b2d52 100644
--- a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl
+++ b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl
@@ -18,7 +18,10 @@ all() ->
groups() ->
[{non_parallel_tests, [],
- [manage_super_stream, lookup_leader, partition_index]}].
+ [manage_super_stream,
+ lookup_leader,
+ lookup_member,
+ partition_index]}].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
@@ -84,6 +87,17 @@ lookup_leader(Config) ->
?assertEqual({ok, deleted}, delete_stream(Config, Stream)).
+lookup_member(Config) ->
+ Stream = <<"stream_manager_lookup_member_stream">>,
+ ?assertMatch({ok, _}, create_stream(Config, Stream)),
+
+ {ok, Pid} = lookup_member(Config, Stream),
+ ?assert(is_pid(Pid)),
+
+ ?assertEqual({error, not_found}, lookup_member(Config, <<"foo">>)),
+
+ ?assertEqual({ok, deleted}, delete_stream(Config, Stream)).
+
manage_super_stream(Config) ->
% create super stream
?assertEqual(ok,
@@ -215,6 +229,13 @@ lookup_leader(Config, Name) ->
lookup_leader,
[<<"/">>, Name]).
+lookup_member(Config, Name) ->
+ rabbit_ct_broker_helpers:rpc(Config,
+ 0,
+ rabbit_stream_manager,
+ lookup_member,
+ [<<"/">>, Name]).
+
partitions(Config, Name) ->
rabbit_ct_broker_helpers:rpc(Config,
0,
diff --git a/deps/rabbitmq_stream_common/include/rabbit_stream.hrl b/deps/rabbitmq_stream_common/include/rabbit_stream.hrl
index a9c20101ee..c925ed7f5d 100644
--- a/deps/rabbitmq_stream_common/include/rabbit_stream.hrl
+++ b/deps/rabbitmq_stream_common/include/rabbit_stream.hrl
@@ -25,6 +25,7 @@
-define(COMMAND_PARTITIONS, 25).
-define(COMMAND_CONSUMER_UPDATE, 26).
-define(COMMAND_EXCHANGE_COMMAND_VERSIONS, 27).
+-define(COMMAND_STREAM_INFO, 28).
-define(REQUEST, 0).
-define(RESPONSE, 1).
diff --git a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl
index 41f99b2bd5..34ad9f9232 100644
--- a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl
+++ b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl
@@ -108,7 +108,8 @@
{consumer_update, subscription_id(), active()} |
{exchange_command_versions,
[{Command :: atom(), MinVersion :: command_version(),
- MaxVersion :: command_version()}]}} |
+ MaxVersion :: command_version()}]} |
+ {stream_info, Stream :: binary()}} |
{response, correlation_id(),
{declare_publisher |
delete_publisher |
@@ -138,7 +139,8 @@
{consumer_update, response_code(), none | offset_spec()} |
{exchange_command_versions, response_code(),
[{Command :: atom(), MinVersion :: command_version(),
- MaxVersion :: command_version()}]}} |
+ MaxVersion :: command_version()}]} |
+ {stream_info, response_code(), Info :: #{binary() => binary()}}} |
{unknown, binary()}.
-spec init(term()) -> state().
@@ -404,9 +406,9 @@ response_body({metadata = Tag, Endpoints, Metadata}) ->
maps:fold(fun (Stream, Info, Acc) when is_atom(Info) ->
Code =
case Info of
- stream_not_found ->
+ not_found ->
?RESPONSE_CODE_STREAM_DOES_NOT_EXIST;
- stream_not_available ->
+ not_available ->
?RESPONSE_CODE_STREAM_NOT_AVAILABLE
end,
StreamLength = byte_size(Stream),
@@ -471,7 +473,20 @@ response_body({exchange_command_versions = Tag, Code,
end,
[], CommandVersions),
{command_id(Tag),
- [<<Code:16, (length(CommandVersions)):32>>, CommandVersionsBin]}.
+ [<<Code:16, (length(CommandVersions)):32>>, CommandVersionsBin]};
+response_body({stream_info = Tag, Code, Info}) ->
+ Init = <<Code:16, (maps:size(Info)):32>>,
+ {command_id(Tag),
+ maps:fold(fun(Key, Value, Acc) ->
+ KeySize = byte_size(Key),
+ ValueSize = byte_size(Value),
+ <<Acc/binary,
+ KeySize:16,
+ Key:KeySize/binary,
+ ValueSize:16,
+ Value:ValueSize/binary>>
+ end,
+ Init, Info)}.
request_body({declare_publisher = Tag,
PublisherId,
@@ -576,7 +591,9 @@ request_body({exchange_command_versions = Tag, CommandVersions}) ->
end,
[], CommandVersions),
CommandVersionsLength = length(CommandVersions),
- {Tag, [<<CommandVersionsLength:32>>, CommandVersionsBin]}.
+ {Tag, [<<CommandVersionsLength:32>>, CommandVersionsBin]};
+request_body({stream_info = Tag, Stream}) ->
+ {Tag, <<?STRING(Stream)>>}.
append_data(Prev, Data) when is_binary(Prev) ->
[Prev, Data];
@@ -843,6 +860,12 @@ parse_request(<<?REQUEST:1,
CommandVersionsBin/binary>>) ->
CommandVersions = parse_command_versions(CommandVersionsBin),
request(CorrelationId, {exchange_command_versions, CommandVersions});
+parse_request(<<?REQUEST:1,
+ ?COMMAND_STREAM_INFO:15,
+ ?VERSION_1:16,
+ CorrelationId:32,
+ ?STRING(StreamSize, Stream)>>) ->
+ request(CorrelationId, {stream_info, Stream});
parse_request(Bin) ->
{unknown, Bin}.
@@ -928,7 +951,11 @@ parse_response_body(?COMMAND_EXCHANGE_COMMAND_VERSIONS,
<<ResponseCode:16, _CommandVersionsCount:32,
CommandVersionsBin/binary>>) ->
CommandVersions = parse_command_versions(CommandVersionsBin),
- {exchange_command_versions, ResponseCode, CommandVersions}.
+ {exchange_command_versions, ResponseCode, CommandVersions};
+parse_response_body(?COMMAND_STREAM_INFO,
+ <<ResponseCode:16, _Count:32, InfoBin/binary>>) ->
+ Info = parse_map(InfoBin, #{}),
+ {stream_info, ResponseCode, Info}.
offset_spec(OffsetType, OffsetValueBin) ->
case OffsetType of
@@ -1079,7 +1106,9 @@ command_id(partitions) ->
command_id(consumer_update) ->
?COMMAND_CONSUMER_UPDATE;
command_id(exchange_command_versions) ->
- ?COMMAND_EXCHANGE_COMMAND_VERSIONS.
+ ?COMMAND_EXCHANGE_COMMAND_VERSIONS;
+command_id(stream_info) ->
+ ?COMMAND_STREAM_INFO.
parse_command_id(?COMMAND_DECLARE_PUBLISHER) ->
declare_publisher;
@@ -1134,7 +1163,9 @@ parse_command_id(?COMMAND_PARTITIONS) ->
parse_command_id(?COMMAND_CONSUMER_UPDATE) ->
consumer_update;
parse_command_id(?COMMAND_EXCHANGE_COMMAND_VERSIONS) ->
- exchange_command_versions.
+ exchange_command_versions;
+parse_command_id(?COMMAND_STREAM_INFO) ->
+ stream_info.
element_index(Element, List) ->
element_index(Element, List, 0).
diff --git a/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl b/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl
index a37daceda8..fbc11d9370 100644
--- a/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl
+++ b/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl
@@ -108,6 +108,8 @@ roundtrip(_Config) ->
test_roundtrip({request, 99,
{exchange_command_versions,
[{deliver, ?VERSION_1, ?VERSION_1}]}}),
+ test_roundtrip({request, 99, {stream_info, <<"stream_name">>}}),
+
%% RESPONSES
[test_roundtrip({response, 99, {Tag, 53}})
|| Tag
@@ -140,6 +142,8 @@ roundtrip(_Config) ->
test_roundtrip({response, 99,
{exchange_command_versions, 1,
[{publish, ?VERSION_1, ?VERSION_1}]}}),
+ test_roundtrip({response, 99,
+ {stream_info, 1, #{<<"committed_offset">> => <<"42">>}}}),
ok.
roundtrip_metadata(_Config) ->