summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-11-17 10:15:09 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-11-17 10:15:09 +0100
commit1ed8f76eed90bf45a6a3d12e1434b7831ec4fdcf (patch)
treec488f3c2fef7748a690c1d0484ffb4d2e97f18df
parentd0d901f095e4d20e3560fb13feb1398ec236a84a (diff)
downloadrabbitmq-server-git-1ed8f76eed90bf45a6a3d12e1434b7831ec4fdcf.tar.gz
Check if member processes are alive in metadata command
In case the Mnesia record is stale.
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl24
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl142
2 files changed, 93 insertions, 73 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index 3981d94bd0..e418dd1022 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -95,8 +95,8 @@ validate_stream_queue_arguments([{<<"x-initial-cluster-size">>, long, ClusterSiz
error;
validate_stream_queue_arguments([{<<"x-queue-leader-locator">>, longstr, Locator} | T]) ->
case lists:member(Locator, [<<"client-local">>,
- <<"random">>,
- <<"least-leaders">>]) of
+ <<"random">>,
+ <<"least-leaders">>]) of
true ->
validate_stream_queue_arguments(T);
false ->
@@ -210,7 +210,25 @@ handle_call({topology, VirtualHost, Stream}, _From, State) ->
{ok, Q} ->
case is_stream_queue(Q) of
true ->
- {ok, maps:with([leader_node, replica_nodes], amqqueue:get_type_state(Q))};
+ QState = amqqueue:get_type_state(Q),
+ ProcessAliveFun = fun(Pid) ->
+ rpc:call(node(Pid), erlang, is_process_alive, [Pid], 10000)
+ end,
+ LeaderNode = case ProcessAliveFun(maps:get(leader_pid, QState)) of
+ true ->
+ maps:get(leader_node, QState);
+ _ ->
+ undefined
+ end,
+ ReplicaNodes = lists:foldl(fun(Pid, Acc) ->
+ case ProcessAliveFun(Pid) of
+ true ->
+ Acc ++ [node(Pid)];
+ _ ->
+ Acc
+ end
+ end, [], maps:get(replica_pids, QState)),
+ {ok, #{leader_node => LeaderNode, replica_nodes => ReplicaNodes}};
_ ->
{error, stream_not_found}
end;
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 4507c98fe6..d3b4820256 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -553,8 +553,8 @@ handle_frame_pre_auth(Transport, #stream_connection{socket = S} = Connection, St
{Connection, State, Rest};
handle_frame_pre_auth(Transport,
#stream_connection{socket = S,
- authentication_state = AuthState0,
- host = Host} = Connection0, State,
+ authentication_state = AuthState0,
+ host = Host} = Connection0, State,
<<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32,
MechanismLength:16, Mechanism:MechanismLength/binary,
SaslFragment/binary>>, Rest) ->
@@ -578,39 +578,39 @@ handle_frame_pre_auth(Transport,
C1 = Connection0#stream_connection{auth_mechanism = {Mechanism, AuthMechanism}},
{C2, FrameFragment} =
case AuthMechanism:handle_response(SaslBin, AuthState) of
- {refused, Username, Msg, Args} ->
- rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream),
- auth_fail(Username, Msg, Args, C1, State),
- rabbit_log:warning(Msg, Args),
- {C1#stream_connection{connection_step = failure}, <<?RESPONSE_AUTHENTICATION_FAILURE:16>>};
- {protocol_error, Msg, Args} ->
- rabbit_core_metrics:auth_attempt_failed(RemoteAddress, <<>>, stream),
- notify_auth_result(none, user_authentication_failure,
- [{error, rabbit_misc:format(Msg, Args)}],
- C1, State),
- rabbit_log:warning(Msg, Args),
- {C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_ERROR:16>>};
- {challenge, Challenge, AuthState1} ->
- rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, stream),
- ChallengeSize = byte_size(Challenge),
- {C1#stream_connection{authentication_state = AuthState1, connection_step = authenticating},
- <<?RESPONSE_SASL_CHALLENGE:16, ChallengeSize:32, Challenge/binary>>
- };
- {ok, User = #user{username = Username}} ->
- case rabbit_access_control:check_user_loopback(Username, S) of
- ok ->
- rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, Username, stream),
- notify_auth_result(Username, user_authentication_success,
- [], C1, State),
- {C1#stream_connection{authentication_state = done, user = User, connection_step = authenticated},
- <<?RESPONSE_CODE_OK:16>>
- };
- not_allowed ->
- rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream),
- rabbit_log:warning("User '~s' can only connect via localhost~n", [Username]),
- {C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK:16>>}
- end
- end,
+ {refused, Username, Msg, Args} ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream),
+ auth_fail(Username, Msg, Args, C1, State),
+ rabbit_log:warning(Msg, Args),
+ {C1#stream_connection{connection_step = failure}, <<?RESPONSE_AUTHENTICATION_FAILURE:16>>};
+ {protocol_error, Msg, Args} ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, <<>>, stream),
+ notify_auth_result(none, user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}],
+ C1, State),
+ rabbit_log:warning(Msg, Args),
+ {C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_ERROR:16>>};
+ {challenge, Challenge, AuthState1} ->
+ rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, stream),
+ ChallengeSize = byte_size(Challenge),
+ {C1#stream_connection{authentication_state = AuthState1, connection_step = authenticating},
+ <<?RESPONSE_SASL_CHALLENGE:16, ChallengeSize:32, Challenge/binary>>
+ };
+ {ok, User = #user{username = Username}} ->
+ case rabbit_access_control:check_user_loopback(Username, S) of
+ ok ->
+ rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, Username, stream),
+ notify_auth_result(Username, user_authentication_success,
+ [], C1, State),
+ {C1#stream_connection{authentication_state = done, user = User, connection_step = authenticated},
+ <<?RESPONSE_CODE_OK:16>>
+ };
+ not_allowed ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream),
+ rabbit_log:warning("User '~s' can only connect via localhost~n", [Username]),
+ {C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK:16>>}
+ end
+ end,
Frame = <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32, FrameFragment/binary>>,
frame(Transport, C1, Frame),
{C2, Rest};
@@ -689,9 +689,9 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credi
PublisherId:8/unsigned,
MessageCount:32, Messages/binary>>, Rest) ->
case rabbit_stream_utils:check_write_permitted(
- #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
- User,
- #{}) of
+ #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
+ User,
+ #{}) of
ok ->
case lookup_leader(Stream, Connection) of
cluster_not_found ->
@@ -721,9 +721,9 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket,
<<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:8/unsigned, StreamSize:16, Stream:StreamSize/binary,
OffsetType:16/signed, OffsetAndCredit/binary>>, Rest) ->
case rabbit_stream_utils:check_read_permitted(
- #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
- User,
- #{}) of
+ #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
+ User,
+ #{}) of
ok ->
case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of
{error, not_available} ->
@@ -851,13 +851,13 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, send_file_oct =
handle_frame_post_auth(_Transport, #stream_connection{virtual_host = VirtualHost, user = User} = Connection,
State,
<<?COMMAND_COMMIT_OFFSET:16, ?VERSION_0:16, _CorrelationId:32,
- ReferenceSize:16, Reference:ReferenceSize/binary,
- StreamSize:16, Stream:StreamSize/binary, Offset:64>>, Rest) ->
+ ReferenceSize:16, Reference:ReferenceSize/binary,
+ StreamSize:16, Stream:StreamSize/binary, Offset:64>>, Rest) ->
case rabbit_stream_utils:check_write_permitted(
- #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
- User,
- #{}) of
+ #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
+ User,
+ #{}) of
ok ->
case lookup_leader(Stream, Connection) of
cluster_not_found ->
@@ -880,24 +880,24 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host =
StreamSize:16, Stream:StreamSize/binary>>, Rest) ->
FrameSize = ?RESPONSE_FRAME_SIZE + 8,
{ResponseCode, Offset} = case rabbit_stream_utils:check_read_permitted(
- #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
- User,
- #{}) of
- ok ->
- case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of
- {error, not_found} ->
- {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0};
- {ok, LocalMemberPid} ->
- {?RESPONSE_CODE_OK, case osiris:read_tracking(LocalMemberPid, Reference) of
- undefined ->
- 0;
- Offt ->
- Offt
- end}
- end;
- error ->
- {?RESPONSE_CODE_ACCESS_REFUSED, 0}
- end,
+ #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
+ User,
+ #{}) of
+ ok ->
+ case rabbit_stream_manager:lookup_local_member(VirtualHost, Stream) of
+ {error, not_found} ->
+ {?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0};
+ {ok, LocalMemberPid} ->
+ {?RESPONSE_CODE_OK, case osiris:read_tracking(LocalMemberPid, Reference) of
+ undefined ->
+ 0;
+ Offt ->
+ Offt
+ end}
+ end;
+ error ->
+ {?RESPONSE_CODE_ACCESS_REFUSED, 0}
+ end,
Transport:send(S, [<<FrameSize:32, ?COMMAND_QUERY_OFFSET:16, ?VERSION_0:16>>,
<<CorrelationId:32>>, <<ResponseCode:16>>, <<Offset:64>>]),
{Connection, State, Rest};
@@ -909,9 +909,9 @@ handle_frame_post_auth(Transport, #stream_connection{virtual_host = VirtualHost,
{ok, StreamName} ->
{Arguments, _Rest} = rabbit_stream_utils:parse_map(ArgumentsBinary, ArgumentsCount),
case rabbit_stream_utils:check_configure_permitted(
- #resource{name = StreamName, kind = queue, virtual_host = VirtualHost},
- User,
- #{}) of
+ #resource{name = StreamName, kind = queue, virtual_host = VirtualHost},
+ User,
+ #{}) of
ok ->
case rabbit_stream_manager:create(VirtualHost, StreamName, Arguments, Username) of
{ok, #{leader_pid := LeaderPid, replica_pids := ReturnedReplicas}} ->
@@ -940,9 +940,9 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host =
user = #user{username = Username} = User} = Connection, State,
<<?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary>>, Rest) ->
case rabbit_stream_utils:check_configure_permitted(
- #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
- User,
- #{}) of
+ #resource{name = Stream, kind = queue, virtual_host = VirtualHost},
+ User,
+ #{}) of
ok ->
case rabbit_stream_manager:delete(VirtualHost, Stream, Username) of
{ok, deleted} ->
@@ -973,6 +973,8 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, virtual_host =
%% get the nodes involved in the streams
NodesMap = lists:foldl(fun(Stream, Acc) ->
case rabbit_stream_manager:topology(VirtualHost, Stream) of
+ {ok, #{leader_node := undefined, replica_nodes := ReplicaNodes}} ->
+ lists:foldl(fun(ReplicaNode, NodesAcc) -> maps:put(ReplicaNode, ok, NodesAcc) end, Acc, ReplicaNodes);
{ok, #{leader_node := LeaderNode, replica_nodes := ReplicaNodes}} ->
Acc1 = maps:put(LeaderNode, ok, Acc),
lists:foldl(fun(ReplicaNode, NodesAcc) -> maps:put(ReplicaNode, ok, NodesAcc) end, Acc1, ReplicaNodes);