summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-11-26 18:18:43 +0100
committerMichael Klishin <michael@clojurewerkz.org>2020-11-30 13:29:17 +0300
commite11ce7e152b13210a79ccc28426ffd99c4b4cb0d (patch)
tree4b34888e03f45f71da417b083404571628d6aac0
parent7721caf8a1f3c32d525edb1081c45b65756e8f47 (diff)
downloadrabbitmq-server-git-e11ce7e152b13210a79ccc28426ffd99c4b4cb0d.tar.gz
Include publisher state management
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl116
1 files changed, 80 insertions, 36 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 59e2284960..425d2dbb23 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -27,7 +27,8 @@
-record(publisher, {
publisher_id :: publisher_id(),
stream :: stream(),
- reference :: 'undefined' | publisher_reference()
+ reference :: 'undefined' | publisher_reference(),
+ leader :: pid()
}).
-record(consumer, {
@@ -731,7 +732,6 @@ handle_frame_post_auth(Transport, #stream_connection{
PublisherId:8,
ReferenceSize:16, Reference:ReferenceSize/binary,
StreamSize:16, Stream:StreamSize/binary>>, Rest) ->
- %% FIXME check if the publisher ID and reference do not already exist
case rabbit_stream_utils:check_write_permitted(
#resource{name = Stream, kind = queue, virtual_host = VirtualHost},
User,
@@ -743,14 +743,15 @@ handle_frame_post_auth(Transport, #stream_connection{
cluster_not_found ->
response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST),
{Connection0, State, Rest};
- {_, #stream_connection{publishers = Publishers0, publisher_to_ids = RefIds0} = Connection1} ->
+ {ClusterLeader, #stream_connection{publishers = Publishers0, publisher_to_ids = RefIds0} = Connection1} ->
{PublisherReference, RefIds1} = case Reference of
<<"">> -> {undefined, RefIds0};
_ -> {Reference, RefIds0#{{Stream, Reference} => PublisherId}}
end,
Publisher = #publisher{publisher_id = PublisherId,
stream = Stream,
- reference = PublisherReference},
+ reference = PublisherReference,
+ leader = ClusterLeader},
response(Transport, Connection0, ?COMMAND_DECLARE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_OK),
{Connection1#stream_connection{publishers = Publishers0#{PublisherId => Publisher},
publisher_to_ids = RefIds1}, State, Rest}
@@ -772,8 +773,9 @@ handle_frame_post_auth(Transport, #stream_connection{publishers = Publishers,
Connection1 = Connection0#stream_connection{
publishers = maps:remove(PublisherId, Publishers),
publisher_to_ids = maps:remove({Stream, Ref}, PubToIds)},
+ Connection2 = maybe_clean_connection_from_stream(Stream, Connection1),
response(Transport, Connection1, ?COMMAND_DELETE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_OK),
- {Connection1, State, Rest};
+ {Connection2, State, Rest};
_ ->
response(Transport, Connection0, ?COMMAND_DELETE_PUBLISHER, CorrelationId, ?RESPONSE_CODE_PUBLISHER_DOES_NOT_EXIST),
{Connection0, State, Rest}
@@ -893,8 +895,7 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket,
response(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_ACCESS_REFUSED),
{Connection, State, Rest}
end;
-handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = StreamSubscriptions,
- stream_leaders = StreamLeaders} = Connection,
+handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = StreamSubscriptions} = Connection,
#stream_connection_state{consumers = Consumers} = State,
<<?COMMAND_UNSUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:8/unsigned>>, Rest) ->
case subscription_exists(StreamSubscriptions, SubscriptionId) of
@@ -906,28 +907,19 @@ handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = Stre
Stream = Consumer#consumer.stream,
#{Stream := SubscriptionsForThisStream} = StreamSubscriptions,
SubscriptionsForThisStream1 = lists:delete(SubscriptionId, SubscriptionsForThisStream),
- {Connection1, StreamSubscriptions1, StreamLeaders1} =
+ StreamSubscriptions1 =
case length(SubscriptionsForThisStream1) of
0 ->
- %% no more subscriptions for this stream
- %% we unregister even though it could affect publishing if the stream is published to
- %% from this connection and is deleted.
- %% to mitigate this, we remove the stream from the leaders cache
- %% this way the stream leader will be looked up in the next publish command
- %% and registered to.
- C = demonitor_stream(Stream, Connection),
- {C, maps:remove(Stream, StreamSubscriptions),
- maps:remove(Stream, StreamLeaders)
- };
+ % no more subscription for this stream
+ maps:remove(Stream, StreamSubscriptions);
_ ->
- {Connection, StreamSubscriptions#{Stream => SubscriptionsForThisStream1}, StreamLeaders}
+ StreamSubscriptions#{Stream => SubscriptionsForThisStream1}
end,
+ Connection1 = Connection#stream_connection{stream_subscriptions = StreamSubscriptions1},
Consumers1 = maps:remove(SubscriptionId, Consumers),
+ Connection2 = maybe_clean_connection_from_stream(Stream, Connection1),
response_ok(Transport, Connection, ?COMMAND_SUBSCRIBE, CorrelationId),
- {Connection1#stream_connection{
- stream_subscriptions = StreamSubscriptions1,
- stream_leaders = StreamLeaders1
- }, State#stream_connection_state{consumers = Consumers1}, Rest}
+ {Connection2, State#stream_connection_state{consumers = Consumers1}, Rest}
end;
handle_frame_post_auth(Transport, #stream_connection{socket = S, send_file_oct = SendFileOct} = Connection,
#stream_connection_state{consumers = Consumers} = State,
@@ -1213,21 +1205,49 @@ handle_frame_post_close(_Transport, Connection, State, Frame, Rest) ->
rabbit_log:warning("ignored frame on close ~p ~p.~n", [Frame, Rest]),
{Connection, State, Rest}.
-clean_state_after_stream_deletion_or_failure(Stream, #stream_connection{stream_leaders = StreamLeaders, stream_subscriptions = StreamSubscriptions} = Connection,
- #stream_connection_state{consumers = Consumers} = State) ->
- case {maps:is_key(Stream, StreamSubscriptions), maps:is_key(Stream, StreamLeaders)} of
- {true, _} ->
+clean_state_after_stream_deletion_or_failure(Stream,
+ #stream_connection{stream_subscriptions = StreamSubscriptions,
+ publishers = Publishers,
+ publisher_to_ids = PublisherToIds,
+ stream_leaders = Leaders} = C0,
+ #stream_connection_state{consumers = Consumers} = S0) ->
+ {SubscriptionsCleaned, C1, S1} = case stream_has_subscriptions(Stream, C0) of
+ true ->
#{Stream := SubscriptionIds} = StreamSubscriptions,
- {cleaned, Connection#stream_connection{
- stream_leaders = maps:remove(Stream, StreamLeaders),
+ {true, C0#stream_connection{
stream_subscriptions = maps:remove(Stream, StreamSubscriptions)
- }, State#stream_connection_state{consumers = maps:without(SubscriptionIds, Consumers)}};
- {false, true} ->
- {cleaned, Connection#stream_connection{
- stream_leaders = maps:remove(Stream, StreamLeaders)
- }, State};
- {false, false} ->
- {not_cleaned, Connection, State}
+ }, S0#stream_connection_state{consumers = maps:without(SubscriptionIds, Consumers)}};
+ false ->
+ {false, C0, S0}
+ end,
+ {PublishersCleaned, C2, S2} = case stream_has_publishers(Stream, C1) of
+ true ->
+ {PurgedPubs, PurgedPubToIds} = maps:fold(
+ fun(PubId, #publisher{stream = S, reference = Ref}, {Pubs, PubToIds}) ->
+ case S of
+ Stream ->
+ {maps:remove(PubId, Pubs), maps:remove({Stream, Ref}, PubToIds)};
+ _ ->
+ {Pubs, PubToIds}
+ end
+ end, {Publishers, PublisherToIds}, Publishers),
+ {true, C1#stream_connection{publishers = PurgedPubs, publisher_to_ids = PurgedPubToIds},
+ S1};
+ false ->
+ {false, C1, S1}
+ end,
+ {LeadersCleaned, Leaders1} = case Leaders of
+ #{Stream := _} ->
+ {true, maps:remove(Stream, Leaders)};
+ _ ->
+ {false, Leaders}
+ end,
+ case SubscriptionsCleaned orelse PublishersCleaned orelse LeadersCleaned of
+ true ->
+ C3 = demonitor_stream(Stream, C2),
+ {cleaned, C3#stream_connection{stream_leaders = Leaders1}, S2};
+ false ->
+ {not_cleaned, C2#stream_connection{stream_leaders = Leaders1}, S2}
end.
lookup_leader(Stream, #stream_connection{stream_leaders = StreamLeaders, virtual_host = VirtualHost} = Connection) ->
@@ -1247,6 +1267,16 @@ lookup_leader(Stream, #stream_connection{stream_leaders = StreamLeaders, virtual
lookup_leader_from_manager(VirtualHost, Stream) ->
rabbit_stream_manager:lookup_leader(VirtualHost, Stream).
+maybe_clean_connection_from_stream(Stream, #stream_connection{stream_leaders = Leaders} = Connection0) ->
+ Connection1 =
+ case {stream_has_publishers(Stream, Connection0), stream_has_subscriptions(Stream, Connection0)} of
+ {false, false} ->
+ demonitor_stream(Stream, Connection0);
+ _ ->
+ Connection0
+ end,
+ Connection1#stream_connection{stream_leaders = maps:remove(Stream, Leaders)}.
+
maybe_monitor_stream(Pid, Stream, #stream_connection{monitors = Monitors} = Connection) ->
case lists:member(Stream, maps:values(Monitors)) of
true ->
@@ -1260,6 +1290,7 @@ demonitor_stream(Stream, #stream_connection{monitors = Monitors0} = Connection)
Monitors = maps:fold(fun(MonitorRef, Strm, Acc) ->
case Strm of
Stream ->
+ demonitor(MonitorRef, [flush]),
Acc;
_ ->
maps:put(MonitorRef, Strm, Acc)
@@ -1268,6 +1299,19 @@ demonitor_stream(Stream, #stream_connection{monitors = Monitors0} = Connection)
end, #{}, Monitors0),
Connection#stream_connection{monitors = Monitors}.
+stream_has_subscriptions(Stream, #stream_connection{stream_subscriptions = Subscriptions}) ->
+ case Subscriptions of
+ #{Stream := StreamSubscriptions} when length(StreamSubscriptions) > 0 ->
+ true;
+ _ ->
+ false
+ end.
+
+stream_has_publishers(Stream, #stream_connection{publishers = Publishers}) ->
+ lists:any(fun(#publisher{stream = S}) ->
+ case S of Stream -> true; _ -> false end
+ end, maps:values(Publishers)).
+
demonitor_all_streams(#stream_connection{monitors = Monitors} = Connection) ->
lists:foreach(fun(MonitorRef) ->
demonitor(MonitorRef, [flush])