diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2022-08-03 14:38:45 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2022-08-03 14:38:45 +0200 |
commit | 8687e73c7e7e3f89e267749d39077bdd4ed8eef5 (patch) | |
tree | bf9a08778a6bdb56fd74bd36f2c44501b43b073b | |
parent | 589ed430e280bd09efcbb5ab634a7a202bdcbb19 (diff) | |
download | rabbitmq-server-git-8687e73c7e7e3f89e267749d39077bdd4ed8eef5.tar.gz |
Add StreamInfo command to stream protocol
It returns general information on a stream, the first
and committed offsets for now.
Fixes #5412
9 files changed, 278 insertions, 110 deletions
diff --git a/deps/rabbitmq_stream/docs/PROTOCOL.adoc b/deps/rabbitmq_stream/docs/PROTOCOL.adoc index 2f02a95e9f..bae72dd6d8 100644 --- a/deps/rabbitmq_stream/docs/PROTOCOL.adoc +++ b/deps/rabbitmq_stream/docs/PROTOCOL.adoc @@ -226,6 +226,11 @@ used to make the difference between a request (0) and a response (1). Example fo |0x001b |Yes +|<<streaminfo>> +|Client +|0x001c +|Yes + |=== === DeclarePublisher @@ -702,6 +707,26 @@ CommandVersionsExchangeResponse => Key Version CorrelationId ResponseCode [Comma MaxVersion => uint16 ``` +=== StreamInfo + +``` +StreamInfoRequest => Key Version CorrelationId Stream + Key => uint16 // 0x001c + Version => uint16 + CorrelationId => uint32 + Stream => string + +StreamInfoResponse => Key Version CorrelationId ResponseCode Info + Key => uint16 // 0x801c + Version => uint16 + CorrelationId => uint32 + ResponseCode => uint16 + Info => [PieceOfInfo] + PieceOfInfo => Key Value + Key => string + Value => string +``` + == Authentication diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index da93d89d94..15db2a05a2 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -33,6 +33,7 @@ delete_super_stream/3, lookup_leader/2, lookup_local_member/2, + lookup_member/2, topology/2, route/3, partitions/2, @@ -100,11 +101,17 @@ lookup_leader(VirtualHost, Stream) -> lookup_local_member(VirtualHost, Stream) -> gen_server:call(?MODULE, {lookup_local_member, VirtualHost, Stream}). +-spec lookup_member(binary(), binary()) -> + {ok, pid()} | {error, not_found} | + {error, not_available}. +lookup_member(VirtualHost, Stream) -> + gen_server:call(?MODULE, {lookup_member, VirtualHost, Stream}). + -spec topology(binary(), binary()) -> {ok, #{leader_node => undefined | pid(), replica_nodes => [pid()]}} | - {error, stream_not_found} | {error, stream_not_available}. + {error, not_found} | {error, not_available}. topology(VirtualHost, Stream) -> gen_server:call(?MODULE, {topology, VirtualHost, Stream}). @@ -292,119 +299,105 @@ handle_call({delete_super_stream, VirtualHost, SuperStream, Username}, {reply, {error, Error}, State} end; handle_call({lookup_leader, VirtualHost, Stream}, _From, State) -> - Name = - #resource{virtual_host = VirtualHost, - kind = queue, - name = Stream}, - Res = case rabbit_amqqueue:lookup(Name) of + Res = case lookup_stream(VirtualHost, Stream) of {ok, Q} -> - case is_stream_queue(Q) of + LeaderPid = amqqueue:get_pid(Q), + case process_alive(LeaderPid) of true -> - LeaderPid = amqqueue:get_pid(Q), - case process_alive(LeaderPid) of - true -> - {ok, LeaderPid}; - false -> - case leader_from_members(Q) of - {ok, Pid} -> - {ok, Pid}; - _ -> - {error, not_available} - end - end; - _ -> - {error, not_found} + {ok, LeaderPid}; + false -> + case leader_from_members(Q) of + {ok, Pid} -> + {ok, Pid}; + _ -> + {error, not_available} + end end; - {error, not_found} -> - case rabbit_amqqueue:not_found_or_absent_dirty(Name) of - not_found -> - {error, not_found}; - _ -> - {error, not_available} - end + R -> + R end, {reply, Res, State}; handle_call({lookup_local_member, VirtualHost, Stream}, _From, State) -> - Name = - #resource{virtual_host = VirtualHost, - kind = queue, - name = Stream}, - Res = case rabbit_amqqueue:lookup(Name) of + Res = case lookup_stream(VirtualHost, Stream) of {ok, Q} -> - case is_stream_queue(Q) of - true -> - #{name := StreamName} = amqqueue:get_type_state(Q), - % FIXME check if pid is alive in case of stale information - case rabbit_stream_coordinator:local_pid(StreamName) - of - {ok, Pid} when is_pid(Pid) -> - {ok, Pid}; - {error, timeout} -> - {error, not_available}; - _ -> - {error, not_available} - end; - _ -> - {error, not_found} - end; - {error, not_found} -> - case rabbit_amqqueue:not_found_or_absent_dirty(Name) of - not_found -> - {error, not_found}; + #{name := StreamName} = amqqueue:get_type_state(Q), + % FIXME check if pid is alive in case of stale information + case rabbit_stream_coordinator:local_pid(StreamName) of + {ok, Pid} when is_pid(Pid) -> + {ok, Pid}; + {error, timeout} -> + {error, not_available}; _ -> {error, not_available} - end + end; + R -> + R end, {reply, Res, State}; -handle_call({topology, VirtualHost, Stream}, _From, State) -> - Name = - #resource{virtual_host = VirtualHost, - kind = queue, - name = Stream}, - Res = case rabbit_amqqueue:lookup(Name) of +handle_call({lookup_member, VirtualHost, Stream}, _From, State) -> + Res = case lookup_stream(VirtualHost, Stream) of {ok, Q} -> - case is_stream_queue(Q) of - true -> - QState = amqqueue:get_type_state(Q), - #{name := StreamName} = QState, + #{name := StreamName} = amqqueue:get_type_state(Q), + % FIXME check if pid is alive in case of stale information + case rabbit_stream_coordinator:local_pid(StreamName) of + {ok, Pid} when is_pid(Pid) -> + {ok, Pid}; + _ -> case rabbit_stream_coordinator:members(StreamName) of {ok, Members} -> - {ok, - maps:fold(fun (_Node, {undefined, _Role}, - Acc) -> - Acc; - (LeaderNode, {_Pid, writer}, - Acc) -> - Acc#{leader_node => - LeaderNode}; - (ReplicaNode, {_Pid, replica}, - Acc) -> - #{replica_nodes := - ReplicaNodes} = - Acc, - Acc#{replica_nodes => - ReplicaNodes - ++ [ReplicaNode]}; - (_Node, _, Acc) -> - Acc - end, - #{leader_node => undefined, - replica_nodes => []}, - Members)}; + case lists:search(fun ({undefined, _Role}) -> + false; + ({P, _Role}) + when is_pid(P) -> + is_process_alive(P); + (_) -> + false + end, + maps:values(Members)) + of + {value, {Pid, _Role}} -> + {ok, Pid}; + _ -> + {error, not_available} + end; _ -> - {error, stream_not_available} - end; - _ -> - {error, stream_not_found} + {error, not_available} + end end; - {error, not_found} -> - case rabbit_amqqueue:not_found_or_absent_dirty(Name) of - not_found -> - {error, stream_not_found}; + R -> + R + end, + {reply, Res, State}; +handle_call({topology, VirtualHost, Stream}, _From, State) -> + Res = case lookup_stream(VirtualHost, Stream) of + {ok, Q} -> + QState = amqqueue:get_type_state(Q), + #{name := StreamName} = QState, + case rabbit_stream_coordinator:members(StreamName) of + {ok, Members} -> + {ok, + maps:fold(fun (_Node, {undefined, _Role}, Acc) -> + Acc; + (LeaderNode, {_Pid, writer}, Acc) -> + Acc#{leader_node => LeaderNode}; + (ReplicaNode, {_Pid, replica}, Acc) -> + #{replica_nodes := ReplicaNodes} = + Acc, + Acc#{replica_nodes => + ReplicaNodes + ++ [ReplicaNode]}; + (_Node, _, Acc) -> + Acc + end, + #{leader_node => undefined, + replica_nodes => []}, + Members)}; _ -> - {error, stream_not_available} - end + {error, not_available} + end; + R -> + R end, {reply, Res, State}; handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From, @@ -837,6 +830,28 @@ delete_super_stream_exchange(VirtualHost, Name, Username) -> {error, validation_failed} end. +lookup_stream(VirtualHost, Stream) -> + Name = + #resource{virtual_host = VirtualHost, + kind = queue, + name = Stream}, + case rabbit_amqqueue:lookup(Name) of + {ok, Q} -> + case is_stream_queue(Q) of + true -> + {ok, Q}; + _ -> + {error, not_found} + end; + {error, not_found} -> + case rabbit_amqqueue:not_found_or_absent_dirty(Name) of + not_found -> + {error, not_found}; + _ -> + {error, not_available} + end + end. + leader_from_members(Q) -> QState = amqqueue:get_type_state(Q), #{name := StreamName} = QState, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 2db2e24649..81265729c9 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -1734,7 +1734,6 @@ handle_frame_post_auth(Transport, State, {request, CorrelationId, {query_publisher_sequence, Reference, Stream}}) -> - % FrameSize = ?RESPONSE_FRAME_SIZE + 8, {ResponseCode, Sequence} = case rabbit_stream_utils:check_read_permitted(#resource{name = Stream, kind = queue, @@ -2642,6 +2641,63 @@ handle_frame_post_auth(Transport, process_client_command_versions(Connection0, CommandVersions), {Connection1, State}; handle_frame_post_auth(Transport, + #stream_connection{socket = S, + virtual_host = VirtualHost, + user = User} = + Connection, + State, + {request, CorrelationId, {stream_info, Stream}}) -> + QueueResource = + #resource{name = Stream, + kind = queue, + virtual_host = VirtualHost}, + Response = + case rabbit_stream_utils:check_read_permitted(QueueResource, User, + #{}) + of + ok -> + case rabbit_stream_manager:lookup_member(VirtualHost, Stream) of + {error, not_available} -> + rabbit_global_counters:increase_protocol_counter(stream, + ?STREAM_NOT_AVAILABLE, + 1), + {stream_info, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE, #{}}; + {error, not_found} -> + rabbit_global_counters:increase_protocol_counter(stream, + ?STREAM_DOES_NOT_EXIST, + 1), + {stream_info, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, + #{}}; + {ok, MemberPid} -> + OffsetInfo = + case gen:call(MemberPid, '$gen_call', + get_reader_context) + of + {ok, #{offset_ref := undefined}} -> + #{}; + {ok, #{offset_ref := OffsetRef}} -> + #{<<"first_offset">> => + rabbit_data_coercion:to_binary( + atomics:get(OffsetRef, 2)), + <<"committed_offset">> => + rabbit_data_coercion:to_binary( + atomics:get(OffsetRef, 1))}; + _ -> + #{} + end, + + {stream_info, ?RESPONSE_CODE_OK, OffsetInfo} + end; + error -> + rabbit_global_counters:increase_protocol_counter(stream, + ?ACCESS_REFUSED, + 1), + {stream_info, ?RESPONSE_CODE_ACCESS_REFUSED, #{}} + end, + Frame = rabbit_stream_core:frame({response, CorrelationId, Response}), + send(Transport, S, Frame), + {Connection, State}; +handle_frame_post_auth(Transport, #stream_connection{socket = S} = Connection, State, {request, CorrelationId, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index c9b6e5f48a..4f4499036d 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -275,4 +275,5 @@ command_versions() -> {close, ?VERSION_1, ?VERSION_1}, {heartbeat, ?VERSION_1, ?VERSION_1}, {route, ?VERSION_1, ?VERSION_1}, - {partitions, ?VERSION_1, ?VERSION_1}]. + {partitions, ?VERSION_1, ?VERSION_1}, + {stream_info, ?VERSION_1, ?VERSION_1}]. diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl index 1262e767df..96782b3160 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl @@ -440,8 +440,10 @@ test_server(Transport, Config) -> C12 = test_deliver_v2(Transport, S, SubscriptionId2, 0, Body, C11), C13 = test_deliver_v2(Transport, S, SubscriptionId2, 1, Body, C12), - C14 = test_delete_stream(Transport, S, Stream, C13), - _C15 = test_close(Transport, S, C14), + C14 = test_stream_info(Transport, S, Stream, C13), + + C15 = test_delete_stream(Transport, S, Stream, C14), + _C16 = test_close(Transport, S, C15), closed = wait_for_socket_close(Transport, S, 10), ok. @@ -620,6 +622,18 @@ test_exchange_command_versions(Transport, S, C0) -> Cmd), C. +test_stream_info(Transport, S, Stream, C0) -> + SICmd = {request, 1, {stream_info, Stream}}, + SIFrame = rabbit_stream_core:frame(SICmd), + ok = Transport:send(S, SIFrame), + {Cmd, C} = receive_commands(Transport, S, C0), + ?assertEqual({response, 1, + {stream_info, ?RESPONSE_CODE_OK, + #{<<"committed_offset">> => <<"1">>, + <<"first_offset">> => <<"0">>}}}, + Cmd), + C. + test_close(Transport, S, C0) -> CloseReason = <<"OK">>, CloseFrame = diff --git a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl index 0a5eb2b7d3..ede66b2d52 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl +++ b/deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl @@ -18,7 +18,10 @@ all() -> groups() -> [{non_parallel_tests, [], - [manage_super_stream, lookup_leader, partition_index]}]. + [manage_super_stream, + lookup_leader, + lookup_member, + partition_index]}]. %% ------------------------------------------------------------------- %% Testsuite setup/teardown. @@ -84,6 +87,17 @@ lookup_leader(Config) -> ?assertEqual({ok, deleted}, delete_stream(Config, Stream)). +lookup_member(Config) -> + Stream = <<"stream_manager_lookup_member_stream">>, + ?assertMatch({ok, _}, create_stream(Config, Stream)), + + {ok, Pid} = lookup_member(Config, Stream), + ?assert(is_pid(Pid)), + + ?assertEqual({error, not_found}, lookup_member(Config, <<"foo">>)), + + ?assertEqual({ok, deleted}, delete_stream(Config, Stream)). + manage_super_stream(Config) -> % create super stream ?assertEqual(ok, @@ -215,6 +229,13 @@ lookup_leader(Config, Name) -> lookup_leader, [<<"/">>, Name]). +lookup_member(Config, Name) -> + rabbit_ct_broker_helpers:rpc(Config, + 0, + rabbit_stream_manager, + lookup_member, + [<<"/">>, Name]). + partitions(Config, Name) -> rabbit_ct_broker_helpers:rpc(Config, 0, diff --git a/deps/rabbitmq_stream_common/include/rabbit_stream.hrl b/deps/rabbitmq_stream_common/include/rabbit_stream.hrl index a9c20101ee..c925ed7f5d 100644 --- a/deps/rabbitmq_stream_common/include/rabbit_stream.hrl +++ b/deps/rabbitmq_stream_common/include/rabbit_stream.hrl @@ -25,6 +25,7 @@ -define(COMMAND_PARTITIONS, 25). -define(COMMAND_CONSUMER_UPDATE, 26). -define(COMMAND_EXCHANGE_COMMAND_VERSIONS, 27). +-define(COMMAND_STREAM_INFO, 28). -define(REQUEST, 0). -define(RESPONSE, 1). diff --git a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl index 41f99b2bd5..34ad9f9232 100644 --- a/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl +++ b/deps/rabbitmq_stream_common/src/rabbit_stream_core.erl @@ -108,7 +108,8 @@ {consumer_update, subscription_id(), active()} | {exchange_command_versions, [{Command :: atom(), MinVersion :: command_version(), - MaxVersion :: command_version()}]}} | + MaxVersion :: command_version()}]} | + {stream_info, Stream :: binary()}} | {response, correlation_id(), {declare_publisher | delete_publisher | @@ -138,7 +139,8 @@ {consumer_update, response_code(), none | offset_spec()} | {exchange_command_versions, response_code(), [{Command :: atom(), MinVersion :: command_version(), - MaxVersion :: command_version()}]}} | + MaxVersion :: command_version()}]} | + {stream_info, response_code(), Info :: #{binary() => binary()}}} | {unknown, binary()}. -spec init(term()) -> state(). @@ -404,9 +406,9 @@ response_body({metadata = Tag, Endpoints, Metadata}) -> maps:fold(fun (Stream, Info, Acc) when is_atom(Info) -> Code = case Info of - stream_not_found -> + not_found -> ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST; - stream_not_available -> + not_available -> ?RESPONSE_CODE_STREAM_NOT_AVAILABLE end, StreamLength = byte_size(Stream), @@ -471,7 +473,20 @@ response_body({exchange_command_versions = Tag, Code, end, [], CommandVersions), {command_id(Tag), - [<<Code:16, (length(CommandVersions)):32>>, CommandVersionsBin]}. + [<<Code:16, (length(CommandVersions)):32>>, CommandVersionsBin]}; +response_body({stream_info = Tag, Code, Info}) -> + Init = <<Code:16, (maps:size(Info)):32>>, + {command_id(Tag), + maps:fold(fun(Key, Value, Acc) -> + KeySize = byte_size(Key), + ValueSize = byte_size(Value), + <<Acc/binary, + KeySize:16, + Key:KeySize/binary, + ValueSize:16, + Value:ValueSize/binary>> + end, + Init, Info)}. request_body({declare_publisher = Tag, PublisherId, @@ -576,7 +591,9 @@ request_body({exchange_command_versions = Tag, CommandVersions}) -> end, [], CommandVersions), CommandVersionsLength = length(CommandVersions), - {Tag, [<<CommandVersionsLength:32>>, CommandVersionsBin]}. + {Tag, [<<CommandVersionsLength:32>>, CommandVersionsBin]}; +request_body({stream_info = Tag, Stream}) -> + {Tag, <<?STRING(Stream)>>}. append_data(Prev, Data) when is_binary(Prev) -> [Prev, Data]; @@ -843,6 +860,12 @@ parse_request(<<?REQUEST:1, CommandVersionsBin/binary>>) -> CommandVersions = parse_command_versions(CommandVersionsBin), request(CorrelationId, {exchange_command_versions, CommandVersions}); +parse_request(<<?REQUEST:1, + ?COMMAND_STREAM_INFO:15, + ?VERSION_1:16, + CorrelationId:32, + ?STRING(StreamSize, Stream)>>) -> + request(CorrelationId, {stream_info, Stream}); parse_request(Bin) -> {unknown, Bin}. @@ -928,7 +951,11 @@ parse_response_body(?COMMAND_EXCHANGE_COMMAND_VERSIONS, <<ResponseCode:16, _CommandVersionsCount:32, CommandVersionsBin/binary>>) -> CommandVersions = parse_command_versions(CommandVersionsBin), - {exchange_command_versions, ResponseCode, CommandVersions}. + {exchange_command_versions, ResponseCode, CommandVersions}; +parse_response_body(?COMMAND_STREAM_INFO, + <<ResponseCode:16, _Count:32, InfoBin/binary>>) -> + Info = parse_map(InfoBin, #{}), + {stream_info, ResponseCode, Info}. offset_spec(OffsetType, OffsetValueBin) -> case OffsetType of @@ -1079,7 +1106,9 @@ command_id(partitions) -> command_id(consumer_update) -> ?COMMAND_CONSUMER_UPDATE; command_id(exchange_command_versions) -> - ?COMMAND_EXCHANGE_COMMAND_VERSIONS. + ?COMMAND_EXCHANGE_COMMAND_VERSIONS; +command_id(stream_info) -> + ?COMMAND_STREAM_INFO. parse_command_id(?COMMAND_DECLARE_PUBLISHER) -> declare_publisher; @@ -1134,7 +1163,9 @@ parse_command_id(?COMMAND_PARTITIONS) -> parse_command_id(?COMMAND_CONSUMER_UPDATE) -> consumer_update; parse_command_id(?COMMAND_EXCHANGE_COMMAND_VERSIONS) -> - exchange_command_versions. + exchange_command_versions; +parse_command_id(?COMMAND_STREAM_INFO) -> + stream_info. element_index(Element, List) -> element_index(Element, List, 0). diff --git a/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl b/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl index a37daceda8..fbc11d9370 100644 --- a/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl +++ b/deps/rabbitmq_stream_common/test/rabbit_stream_core_SUITE.erl @@ -108,6 +108,8 @@ roundtrip(_Config) -> test_roundtrip({request, 99, {exchange_command_versions, [{deliver, ?VERSION_1, ?VERSION_1}]}}), + test_roundtrip({request, 99, {stream_info, <<"stream_name">>}}), + %% RESPONSES [test_roundtrip({response, 99, {Tag, 53}}) || Tag @@ -140,6 +142,8 @@ roundtrip(_Config) -> test_roundtrip({response, 99, {exchange_command_versions, 1, [{publish, ?VERSION_1, ?VERSION_1}]}}), + test_roundtrip({response, 99, + {stream_info, 1, #{<<"committed_offset">> => <<"42">>}}}), ok. roundtrip_metadata(_Config) -> |