+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+-compile({no_auto_import, [apply/3]}).
+ init/1,
+ apply/3,
+ state_enter/2,
+ tick/2,
+ overview/1,
+ get_checked_out/4,
+ %% versioning
+ version/0,
+ which_module/1,
+ %% aux
+ init_aux/1,
+ handle_aux/6,
+ % queries
+ query_messages_ready/1,
+ query_messages_checked_out/1,
+ query_messages_total/1,
+ query_processes/1,
+ query_ra_indexes/1,
+ query_consumer_count/1,
+ query_consumers/1,
+ query_stat/1,
+ query_single_active_consumer/1,
+ query_in_memory_usage/1,
+ query_peek/2,
+ usage/1,
+ zero/1,
+ %% misc
+ dehydrate_state/1,
+ normalize/1,
+ %% protocol helpers
+ make_enqueue/3,
+ make_register_enqueuer/1,
+ make_checkout/3,
+ make_settle/2,
+ make_return/2,
+ make_discard/2,
+ make_credit/4,
+ make_purge/0,
+ make_purge_nodes/1,
+ make_update_config/1,
+ make_garbage_collection/0
+ ]).
+%% command records representing all the protocol actions that are supported
+-record(enqueue, {pid :: option(pid()),
+ seq :: option(msg_seqno()),
+ msg :: raw_msg()}).
+-record(register_enqueuer, {pid :: pid()}).
+-record(checkout, {consumer_id :: consumer_id(),
+ spec :: checkout_spec(),
+ meta :: consumer_meta()}).
+-record(settle, {consumer_id :: consumer_id(),
+ msg_ids :: [msg_id()]}).
+-record(return, {consumer_id :: consumer_id(),
+ msg_ids :: [msg_id()]}).
+-record(discard, {consumer_id :: consumer_id(),
+ msg_ids :: [msg_id()]}).
+-record(credit, {consumer_id :: consumer_id(),
+ credit :: non_neg_integer(),
+ delivery_count :: non_neg_integer(),
+ drain :: boolean()}).
+-record(purge, {}).
+-record(purge_nodes, {nodes :: [node()]}).
+-record(update_config, {config :: config()}).
+-record(garbage_collection, {}).
+-opaque protocol() ::
+ #enqueue{} |
+ #register_enqueuer{} |
+ #checkout{} |
+ #settle{} |
+ #return{} |
+ #discard{} |
+ #credit{} |
+ #purge{} |
+ #purge_nodes{} |
+ #update_config{} |
+ #garbage_collection{}.
+-type command() :: protocol() | ra_machine:builtin_command().
+%% all the command types supported by ra fifo
+-type client_msg() :: delivery().
+%% the messages `rabbit_fifo' can send to consumers.
+-opaque state() :: #?MODULE{}.
+ delivery/0,
+ command/0,
+ credit_mode/0,
+ consumer_tag/0,
+ consumer_meta/0,
+ consumer_id/0,
+ client_msg/0,
+ msg/0,
+ msg_id/0,
+ msg_seqno/0,
+ delivery_msg/0,
+ state/0,
+ config/0]).
+-spec init(config()) -> state().
+init(#{name := Name,
+ queue_resource := Resource} = Conf) ->
+ update_config(Conf, #?MODULE{cfg = #cfg{name = Name,
+ resource = Resource}}).
+update_config(Conf, State) ->
+ DLH = maps:get(dead_letter_handler, Conf, undefined),
+ BLH = maps:get(become_leader_handler, Conf, undefined),
+ RCI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
+ Overflow = maps:get(overflow_strategy, Conf, drop_head),
+ MaxLength = maps:get(max_length, Conf, undefined),
+ MaxBytes = maps:get(max_bytes, Conf, undefined),
+ MaxMemoryLength = maps:get(max_in_memory_length, Conf, undefined),
+ MaxMemoryBytes = maps:get(max_in_memory_bytes, Conf, undefined),
+ DeliveryLimit = maps:get(delivery_limit, Conf, undefined),
+ Expires = maps:get(expires, Conf, undefined),
+ ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
+ true ->
+ single_active;
+ false ->
+ competing
+ end,
+ Cfg = State#?MODULE.cfg,
+ RCISpec = {RCI, RCI},
+ LastActive = maps:get(created, Conf, undefined),
+ State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCISpec,
+ dead_letter_handler = DLH,
+ become_leader_handler = BLH,
+ overflow_strategy = Overflow,
+ max_length = MaxLength,
+ max_bytes = MaxBytes,
+ max_in_memory_length = MaxMemoryLength,
+ max_in_memory_bytes = MaxMemoryBytes,
+ consumer_strategy = ConsumerStrategy,
+ delivery_limit = DeliveryLimit,
+ expires = Expires},
+ last_active = LastActive}.
+zero(_) ->
+ 0.
+% msg_ids are scoped per consumer
+% ra_indexes holds all raft indexes for enqueues currently on queue
+-spec apply(ra_machine:command_meta_data(), command(), state()) ->
+ {state(), Reply :: term(), ra_machine:effects()} |
+ {state(), Reply :: term()}.
+apply(Meta, #enqueue{pid = From, seq = Seq,
+ msg = RawMsg}, State00) ->
+ apply_enqueue(Meta, From, Seq, RawMsg, State00);
+apply(_Meta, #register_enqueuer{pid = Pid},
+ #?MODULE{enqueuers = Enqueuers0,
+ cfg = #cfg{overflow_strategy = Overflow}} = State0) ->
+ State = case maps:is_key(Pid, Enqueuers0) of
+ true ->
+ %% if the enqueuer exits just echo the overflow state
+ State0;
+ false ->
+ State0#?MODULE{enqueuers = Enqueuers0#{Pid => #enqueuer{}}}
+ end,
+ Res = case is_over_limit(State) of
+ true when Overflow == reject_publish ->
+ reject_publish;
+ _ ->
+ ok
+ end,
+ {State, Res, [{monitor, process, Pid}]};
+ #settle{msg_ids = MsgIds, consumer_id = ConsumerId},
+ #?MODULE{consumers = Cons0} = State) ->
+ case Cons0 of
+ #{ConsumerId := Con0} ->
+ % need to increment metrics before completing as any snapshot
+ % states taken need to include them
+ complete_and_checkout(Meta, MsgIds, ConsumerId,
+ Con0, [], State);
+ _ ->
+ {State, ok}
+ end;
+apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
+ #?MODULE{consumers = Cons0} = State0) ->
+ case Cons0 of
+ #{ConsumerId := Con0} ->
+ Discarded = maps:with(MsgIds, Con0#consumer.checked_out),
+ Effects = dead_letter_effects(rejected, Discarded, State0, []),
+ complete_and_checkout(Meta, MsgIds, ConsumerId, Con0,
+ Effects, State0);
+ _ ->
+ {State0, ok}
+ end;
+apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
+ #?MODULE{consumers = Cons0} = State) ->
+ case Cons0 of
+ #{ConsumerId := #consumer{checked_out = Checked0}} ->
+ Returned = maps:with(MsgIds, Checked0),
+ return(Meta, ConsumerId, Returned, [], State);
+ _ ->
+ {State, ok}
+ end;
+apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
+ drain = Drain, consumer_id = ConsumerId},
+ #?MODULE{consumers = Cons0,
+ service_queue = ServiceQueue0,
+ waiting_consumers = Waiting0} = State0) ->
+ case Cons0 of
+ #{ConsumerId := #consumer{delivery_count = DelCnt} = Con0} ->
+ %% this can go below 0 when credit is reduced
+ C = max(0, RemoteDelCnt + NewCredit - DelCnt),
+ %% grant the credit
+ Con1 = Con0#consumer{credit = C},
+ ServiceQueue = maybe_queue_consumer(ConsumerId, Con1,
+ ServiceQueue0),
+ Cons = maps:put(ConsumerId, Con1, Cons0),
+ {State1, ok, Effects} =
+ checkout(Meta, State0,
+ State0#?MODULE{service_queue = ServiceQueue,
+ consumers = Cons}, []),
+ Response = {send_credit_reply, messages_ready(State1)},
+ %% by this point all checkouts for the updated credit value
+ %% should be processed so we can evaluate the drain
+ case Drain of
+ false ->
+ %% just return the result of the checkout
+ {State1, Response, Effects};
+ true ->
+ Con = #consumer{credit = PostCred} =
+ maps:get(ConsumerId, State1#?MODULE.consumers),
+ %% add the outstanding credit to the delivery count
+ DeliveryCount = Con#consumer.delivery_count + PostCred,
+ Consumers = maps:put(ConsumerId,
+ Con#consumer{delivery_count = DeliveryCount,
+ credit = 0},
+ State1#?MODULE.consumers),
+ Drained =,
+ {CTag, _} = ConsumerId,
+ {State1#?MODULE{consumers = Consumers},
+ %% returning a multi response with two client actions
+ %% for the channel to execute
+ {multi, [Response, {send_drained, {CTag, Drained}}]},
+ Effects}
+ end;
+ _ when Waiting0 /= [] ->
+ %% there are waiting consuemrs
+ case lists:keytake(ConsumerId, 1, Waiting0) of
+ {value, {_, Con0 = #consumer{delivery_count = DelCnt}}, Waiting} ->
+ %% the consumer is a waiting one
+ %% grant the credit
+ C = max(0, RemoteDelCnt + NewCredit - DelCnt),
+ Con = Con0#consumer{credit = C},
+ State = State0#?MODULE{waiting_consumers =
+ [{ConsumerId, Con} | Waiting]},
+ {State, {send_credit_reply, messages_ready(State)}};
+ false ->
+ {State0, ok}
+ end;
+ _ ->
+ %% credit for unknown consumer - just ignore
+ {State0, ok}
+ end;
+apply(_, #checkout{spec = {dequeue, _}},
+ #?MODULE{cfg = #cfg{consumer_strategy = single_active}} = State0) ->
+ {State0, {error, {unsupported, single_active_consumer}}};
+apply(#{index := Index,
+ system_time := Ts,
+ from := From} = Meta, #checkout{spec = {dequeue, Settlement},
+ meta = ConsumerMeta,
+ consumer_id = ConsumerId},
+ #?MODULE{consumers = Consumers} = State00) ->
+ %% dequeue always updates last_active
+ State0 = State00#?MODULE{last_active = Ts},
+ %% all dequeue operations result in keeping the queue from expiring
+ Exists = maps:is_key(ConsumerId, Consumers),
+ case messages_ready(State0) of
+ 0 ->
+ {State0, {dequeue, empty}};
+ _ when Exists ->
+ %% a dequeue using the same consumer_id isn't possible at this point
+ {State0, {dequeue, empty}};
+ Ready ->
+ State1 = update_consumer(ConsumerId, ConsumerMeta,
+ {once, 1, simple_prefetch}, 0,
+ State0),
+ {success, _, MsgId, Msg, State2} = checkout_one(Meta, State1),
+ {State4, Effects1} = case Settlement of
+ unsettled ->
+ {_, Pid} = ConsumerId,
+ {State2, [{monitor, process, Pid}]};
+ settled ->
+ %% immediately settle the checkout
+ {State3, _, Effects0} =
+ apply(Meta, make_settle(ConsumerId, [MsgId]),
+ State2),
+ {State3, Effects0}
+ end,
+ {Reply, Effects2} =
+ case Msg of
+ {RaftIdx, {Header, empty}} ->
+ %% TODO add here new log effect with reply
+ {'$ra_no_reply',
+ [reply_log_effect(RaftIdx, MsgId, Header, Ready - 1, From) |
+ Effects1]};
+ _ ->
+ {{dequeue, {MsgId, Msg}, Ready-1}, Effects1}
+ end,
+ case evaluate_limit(Index, false, State0, State4, Effects2) of
+ {State, true, Effects} ->
+ update_smallest_raft_index(Index, Reply, State, Effects);
+ {State, false, Effects} ->
+ {State, Reply, Effects}
+ end
+ end;
+apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
+ {State, Effects} = cancel_consumer(Meta, ConsumerId, State0, [],
+ consumer_cancel),
+ checkout(Meta, State0, State, Effects);
+apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
+ consumer_id = {_, Pid} = ConsumerId},
+ State0) ->
+ Priority = get_priority_from_args(ConsumerMeta),
+ State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, Priority, State0),
+ checkout(Meta, State0, State1, [{monitor, process, Pid}]);
+apply(#{index := Index}, #purge{},
+ #?MODULE{ra_indexes = Indexes0,
+ returns = Returns,
+ messages = Messages} = State0) ->
+ Total = messages_ready(State0),
+ Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
+ [I || {_, {I, _}} <- lqueue:to_list(Messages)]),
+ Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes1,
+ [I || {_, {I, _}} <- lqueue:to_list(Returns)]),
+ State1 = State0#?MODULE{ra_indexes = Indexes,
+ messages = lqueue:new(),
+ returns = lqueue:new(),
+ msg_bytes_enqueue = 0,
+ prefix_msgs = {0, [], 0, []},
+ msg_bytes_in_memory = 0,
+ msgs_ready_in_memory = 0},
+ Effects0 = [garbage_collection],
+ Reply = {purge, Total},
+ {State, _, Effects} = evaluate_limit(Index, false, State0,
+ State1, Effects0),
+ update_smallest_raft_index(Index, Reply, State, Effects);
+apply(_Meta, #garbage_collection{}, State) ->
+ {State, ok, [{aux, garbage_collection}]};
+apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
+ #?MODULE{consumers = Cons0,
+ cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = Waiting0,
+ enqueuers = Enqs0} = State0) ->
+ Node = node(Pid),
+ %% if the pid refers to an active or cancelled consumer,
+ %% mark it as suspected and return it to the waiting queue
+ {State1, Effects0} =
+ maps:fold(fun({_, P} = Cid, C0, {S0, E0})
+ when node(P) =:= Node ->
+ %% the consumer should be returned to waiting
+ %% and checked out messages should be returned
+ Effs = consumer_update_active_effects(
+ S0, Cid, C0, false, suspected_down, E0),
+ Checked = C0#consumer.checked_out,
+ Credit = increase_credit(C0, maps:size(Checked)),
+ {St, Effs1} = return_all(Meta, S0, Effs,
+ Cid, C0#consumer{credit = Credit}),
+ %% if the consumer was cancelled there is a chance it got
+ %% removed when returning hence we need to be defensive here
+ Waiting = case St#?MODULE.consumers of
+ #{Cid := C} ->
+ Waiting0 ++ [{Cid, C}];
+ _ ->
+ Waiting0
+ end,
+ {St#?MODULE{consumers = maps:remove(Cid, St#?MODULE.consumers),
+ waiting_consumers = Waiting,
+ last_active = Ts},
+ Effs1};
+ (_, _, S) ->
+ S
+ end, {State0, []}, Cons0),
+ WaitingConsumers = update_waiting_consumer_status(Node, State1,
+ suspected_down),
+ %% select a new consumer from the waiting queue and run a checkout
+ State2 = State1#?MODULE{waiting_consumers = WaitingConsumers},
+ {State, Effects1} = activate_next_consumer(State2, Effects0),
+ %% mark any enquers as suspected
+ Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
+ E#enqueuer{status = suspected_down};
+ (_, E) -> E
+ end, Enqs0),
+ Effects = [{monitor, node, Node} | Effects1],
+ checkout(Meta, State0, State#?MODULE{enqueuers = Enqs}, Effects);
+apply(#{system_time := Ts} = Meta, {down, Pid, noconnection},
+ #?MODULE{consumers = Cons0,
+ enqueuers = Enqs0} = State0) ->
+ %% A node has been disconnected. This doesn't necessarily mean that
+ %% any processes on this node are down, they _may_ come back so here
+ %% we just mark them as suspected (effectively deactivated)
+ %% and return all checked out messages to the main queue for delivery to any
+ %% live consumers
+ %%
+ %% all pids for the disconnected node will be marked as suspected not just
+ %% the one we got the `down' command for
+ Node = node(Pid),
+ {State, Effects1} =
+ maps:fold(
+ fun({_, P} = Cid, #consumer{checked_out = Checked0,
+ status = up} = C0,
+ {St0, Eff}) when node(P) =:= Node ->
+ Credit = increase_credit(C0, map_size(Checked0)),
+ C = C0#consumer{status = suspected_down,
+ credit = Credit},
+ {St, Eff0} = return_all(Meta, St0, Eff, Cid, C),
+ Eff1 = consumer_update_active_effects(St, Cid, C, false,
+ suspected_down, Eff0),
+ {St, Eff1};
+ (_, _, {St, Eff}) ->
+ {St, Eff}
+ end, {State0, []}, Cons0),
+ Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
+ E#enqueuer{status = suspected_down};
+ (_, E) -> E
+ end, Enqs0),
+ % Monitor the node so that we can "unsuspect" these processes when the node
+ % comes back, then re-issue all monitors and discover the final fate of
+ % these processes
+ Effects = case maps:size(State#?MODULE.consumers) of
+ 0 ->
+ [{aux, inactive}, {monitor, node, Node}];
+ _ ->
+ [{monitor, node, Node}]
+ end ++ Effects1,
+ checkout(Meta, State0, State#?MODULE{enqueuers = Enqs,
+ last_active = Ts}, Effects);
+apply(Meta, {down, Pid, _Info}, State0) ->
+ {State, Effects} = handle_down(Meta, Pid, State0),
+ checkout(Meta, State0, State, Effects);
+apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
+ enqueuers = Enqs0,
+ service_queue = _SQ0} = State0) ->
+ %% A node we are monitoring has come back.
+ %% If we have suspected any processes of being
+ %% down we should now re-issue the monitors for them to detect if they're
+ %% actually down or not
+ Monitors = [{monitor, process, P}
+ || P <- suspected_pids_for(Node, State0)],
+ Enqs1 = maps:map(fun(P, E) when node(P) =:= Node ->
+ E#enqueuer{status = up};
+ (_, E) -> E
+ end, Enqs0),
+ ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
+ %% mark all consumers as up
+ {State1, Effects1} =
+ maps:fold(fun({_, P} = ConsumerId, C, {SAcc, EAcc})
+ when (node(P) =:= Node) and
+ (C#consumer.status =/= cancelled) ->
+ EAcc1 = ConsumerUpdateActiveFun(SAcc, ConsumerId,
+ C, true, up, EAcc),
+ {update_or_remove_sub(Meta, ConsumerId,
+ C#consumer{status = up},
+ SAcc), EAcc1};
+ (_, _, Acc) ->
+ Acc
+ end, {State0, Monitors}, Cons0),
+ Waiting = update_waiting_consumer_status(Node, State1, up),
+ State2 = State1#?MODULE{
+ enqueuers = Enqs1,
+ waiting_consumers = Waiting},
+ {State, Effects} = activate_next_consumer(State2, Effects1),
+ checkout(Meta, State0, State, Effects);
+apply(_, {nodedown, _Node}, State) ->
+ {State, ok};
+apply(Meta, #purge_nodes{nodes = Nodes}, State0) ->
+ {State, Effects} = lists:foldl(fun(Node, {S, E}) ->
+ purge_node(Meta, Node, S, E)
+ end, {State0, []}, Nodes),
+ {State, ok, Effects};
+apply(Meta, #update_config{config = Conf}, State) ->
+ checkout(Meta, State, update_config(Conf, State), []);
+apply(_Meta, {machine_version, 0, 1}, V0State) ->
+ State = convert_v0_to_v1(V0State),
+ {State, ok, []}.
+convert_v0_to_v1(V0State0) ->
+ V0State = rabbit_fifo_v0:normalize_for_v1(V0State0),
+ V0Msgs = rabbit_fifo_v0:get_field(messages, V0State),
+ V1Msgs = lqueue:from_list(lists:sort(maps:to_list(V0Msgs))),
+ V0Enqs = rabbit_fifo_v0:get_field(enqueuers, V0State),
+ V1Enqs = maps:map(
+ fun (_EPid, E) ->
+ #enqueuer{next_seqno = element(2, E),
+ pending = element(3, E),
+ status = element(4, E)}
+ end, V0Enqs),
+ V0Cons = rabbit_fifo_v0:get_field(consumers, V0State),
+ V1Cons = maps:map(
+ fun (_CId, C0) ->
+ %% add the priority field
+ list_to_tuple(tuple_to_list(C0) ++ [0])
+ end, V0Cons),
+ V0SQ = rabbit_fifo_v0:get_field(service_queue, V0State),
+ V1SQ = priority_queue:from_list(queue:to_list(V0SQ)),
+ Cfg = #cfg{name = rabbit_fifo_v0:get_cfg_field(name, V0State),
+ resource = rabbit_fifo_v0:get_cfg_field(resource, V0State),
+ release_cursor_interval = rabbit_fifo_v0:get_cfg_field(release_cursor_interval, V0State),
+ dead_letter_handler = rabbit_fifo_v0:get_cfg_field(dead_letter_handler, V0State),
+ become_leader_handler = rabbit_fifo_v0:get_cfg_field(become_leader_handler, V0State),
+ %% TODO: what if policy enabling reject_publish was applied before conversion?
+ overflow_strategy = drop_head,
+ max_length = rabbit_fifo_v0:get_cfg_field(max_length, V0State),
+ max_bytes = rabbit_fifo_v0:get_cfg_field(max_bytes, V0State),
+ consumer_strategy = rabbit_fifo_v0:get_cfg_field(consumer_strategy, V0State),
+ delivery_limit = rabbit_fifo_v0:get_cfg_field(delivery_limit, V0State),
+ max_in_memory_length = rabbit_fifo_v0:get_cfg_field(max_in_memory_length, V0State),
+ max_in_memory_bytes = rabbit_fifo_v0:get_cfg_field(max_in_memory_bytes, V0State)
+ },
+ #?MODULE{cfg = Cfg,
+ messages = V1Msgs,
+ next_msg_num = rabbit_fifo_v0:get_field(next_msg_num, V0State),
+ returns = rabbit_fifo_v0:get_field(returns, V0State),
+ enqueue_count = rabbit_fifo_v0:get_field(enqueue_count, V0State),
+ enqueuers = V1Enqs,
+ ra_indexes = rabbit_fifo_v0:get_field(ra_indexes, V0State),
+ release_cursors = rabbit_fifo_v0:get_field(release_cursors, V0State),
+ consumers = V1Cons,
+ service_queue = V1SQ,
+ prefix_msgs = rabbit_fifo_v0:get_field(prefix_msgs, V0State),
+ msg_bytes_enqueue = rabbit_fifo_v0:get_field(msg_bytes_enqueue, V0State),
+ msg_bytes_checkout = rabbit_fifo_v0:get_field(msg_bytes_checkout, V0State),
+ waiting_consumers = rabbit_fifo_v0:get_field(waiting_consumers, V0State),
+ msg_bytes_in_memory = rabbit_fifo_v0:get_field(msg_bytes_in_memory, V0State),
+ msgs_ready_in_memory = rabbit_fifo_v0:get_field(msgs_ready_in_memory, V0State)
+ }.
+purge_node(Meta, Node, State, Effects) ->
+ lists:foldl(fun(Pid, {S0, E0}) ->
+ {S, E} = handle_down(Meta, Pid, S0),
+ {S, E0 ++ E}
+ end, {State, Effects}, all_pids_for(Node, State)).
+%% any downs that re not noconnection
+handle_down(Meta, Pid, #?MODULE{consumers = Cons0,
+ enqueuers = Enqs0} = State0) ->
+ % Remove any enqueuer for the same pid and enqueue any pending messages
+ % This should be ok as we won't see any more enqueues from this pid
+ State1 = case maps:take(Pid, Enqs0) of
+ {#enqueuer{pending = Pend}, Enqs} ->
+ lists:foldl(fun ({_, RIdx, RawMsg}, S) ->
+ enqueue(RIdx, RawMsg, S)
+ end, State0#?MODULE{enqueuers = Enqs}, Pend);
+ error ->
+ State0
+ end,
+ {Effects1, State2} = handle_waiting_consumer_down(Pid, State1),
+ % return checked out messages to main queue
+ % Find the consumers for the down pid
+ DownConsumers = maps:keys(
+ maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
+ lists:foldl(fun(ConsumerId, {S, E}) ->
+ cancel_consumer(Meta, ConsumerId, S, E, down)
+ end, {State2, Effects1}, DownConsumers).
+consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) ->
+ fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
+ consumer_update_active_effects(State, ConsumerId, Consumer, Active,
+ ActivityStatus, Effects)
+ end;
+consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = single_active}}) ->
+ fun(_, _, _, _, _, Effects) ->
+ Effects
+ end.
+ #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State) ->
+ {[], State};
+ #?MODULE{cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = []} = State) ->
+ {[], State};
+ #?MODULE{cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = WaitingConsumers0} = State0) ->
+ % get cancel effects for down waiting consumers
+ Down = lists:filter(fun({{_, P}, _}) -> P =:= Pid end,
+ WaitingConsumers0),
+ Effects = lists:foldl(fun ({ConsumerId, _}, Effects) ->
+ cancel_consumer_effects(ConsumerId, State0,
+ Effects)
+ end, [], Down),
+ % update state to have only up waiting consumers
+ StillUp = lists:filter(fun({{_, P}, _}) -> P =/= Pid end,
+ WaitingConsumers0),
+ State = State0#?MODULE{waiting_consumers = StillUp},
+ {Effects, State}.
+ #?MODULE{waiting_consumers = WaitingConsumers},
+ Status) ->
+ [begin
+ case node(Pid) of
+ Node ->
+ {ConsumerId, Consumer#consumer{status = Status}};
+ _ ->
+ {ConsumerId, Consumer}
+ end
+ end || {{_, Pid} = ConsumerId, Consumer} <- WaitingConsumers,
+ Consumer#consumer.status =/= cancelled].
+-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
+state_enter(leader, #?MODULE{consumers = Cons,
+ enqueuers = Enqs,
+ waiting_consumers = WaitingConsumers,
+ cfg = #cfg{name = Name,
+ resource = Resource,
+ become_leader_handler = BLH},
+ prefix_msgs = {0, [], 0, []}
+ }) ->
+ % return effects to monitor all current consumers and enqueuers
+ Pids = lists:usort(maps:keys(Enqs)
+ ++ [P || {_, P} <- maps:keys(Cons)]
+ ++ [P || {{_, P}, _} <- WaitingConsumers]),
+ Mons = [{monitor, process, P} || P <- Pids],
+ Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
+ NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]),
+ FHReservation = [{mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]}],
+ Effects = Mons ++ Nots ++ NodeMons ++ FHReservation,
+ case BLH of
+ undefined ->
+ Effects;
+ {Mod, Fun, Args} ->
+ [{mod_call, Mod, Fun, Args ++ [Name]} | Effects]
+ end;
+state_enter(eol, #?MODULE{enqueuers = Enqs,
+ consumers = Custs0,
+ waiting_consumers = WaitingConsumers0}) ->
+ Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
+ WaitingConsumers1 = lists:foldl(fun({{_, P}, V}, Acc) -> Acc#{P => V} end,
+ #{}, WaitingConsumers0),
+ AllConsumers = maps:merge(Custs, WaitingConsumers1),
+ [{send_msg, P, eol, ra_event}
+ || P <- maps:keys(maps:merge(Enqs, AllConsumers))] ++
+ [{mod_call, rabbit_quorum_queue, file_handle_release_reservation, []}];
+state_enter(State, #?MODULE{cfg = #cfg{resource = _Resource}}) when State =/= leader ->
+ FHReservation = {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []},
+ [FHReservation];
+ state_enter(_, _) ->
+ %% catch all as not handling all states
+ [].
+-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
+tick(Ts, #?MODULE{cfg = #cfg{name = Name,
+ resource = QName},
+ msg_bytes_enqueue = EnqueueBytes,
+ msg_bytes_checkout = CheckoutBytes} = State) ->
+ case is_expired(Ts, State) of
+ true ->
+ [{mod_call, rabbit_quorum_queue, spawn_deleter, [QName]}];
+ false ->
+ Metrics = {Name,
+ messages_ready(State),
+ num_checked_out(State), % checked out
+ messages_total(State),
+ query_consumer_count(State), % Consumers
+ EnqueueBytes,
+ CheckoutBytes},
+ [{mod_call, rabbit_quorum_queue,
+ handle_tick, [QName, Metrics, all_nodes(State)]}]
+ end.
+-spec overview(state()) -> map().
+overview(#?MODULE{consumers = Cons,
+ enqueuers = Enqs,
+ release_cursors = Cursors,
+ enqueue_count = EnqCount,
+ msg_bytes_enqueue = EnqueueBytes,
+ msg_bytes_checkout = CheckoutBytes,
+ cfg = Cfg} = State) ->
+ Conf = #{name =>,
+ resource => Cfg#cfg.resource,
+ release_cursor_interval => Cfg#cfg.release_cursor_interval,
+ dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler,
+ max_length => Cfg#cfg.max_length,
+ max_bytes => Cfg#cfg.max_bytes,
+ consumer_strategy => Cfg#cfg.consumer_strategy,
+ max_in_memory_length => Cfg#cfg.max_in_memory_length,
+ max_in_memory_bytes => Cfg#cfg.max_in_memory_bytes,
+ expires => Cfg#cfg.expires,
+ delivery_limit => Cfg#cfg.delivery_limit
+ },
+ #{type => ?MODULE,
+ config => Conf,
+ num_consumers => maps:size(Cons),
+ num_checked_out => num_checked_out(State),
+ num_enqueuers => maps:size(Enqs),
+ num_ready_messages => messages_ready(State),
+ num_messages => messages_total(State),
+ num_release_cursors => lqueue:len(Cursors),
+ release_cursors => [I || {_, I, _} <- lqueue:to_list(Cursors)],
+ release_cursor_enqueue_counter => EnqCount,
+ enqueue_message_bytes => EnqueueBytes,
+ checkout_message_bytes => CheckoutBytes}.
+-spec get_checked_out(consumer_id(), msg_id(), msg_id(), state()) ->
+ [delivery_msg()].
+get_checked_out(Cid, From, To, #?MODULE{consumers = Consumers}) ->
+ case Consumers of
+ #{Cid := #consumer{checked_out = Checked}} ->
+ [{K, snd(snd(maps:get(K, Checked)))}
+ || K <- lists:seq(From, To),
+ maps:is_key(K, Checked)];
+ _ ->
+ []
+ end.
+-spec version() -> pos_integer().
+version() -> 1.
+which_module(0) -> rabbit_fifo_v0;
+which_module(1) -> ?MODULE.
+-record(aux_gc, {last_raft_idx = 0 :: ra:index()}).
+-record(aux, {name :: atom(),
+ utilisation :: term(),
+ gc = #aux_gc{} :: #aux_gc{}}).
+init_aux(Name) when is_atom(Name) ->
+ %% TODO: catch specific exception throw if table already exists
+ ok = ra_machine_ets:create_table(rabbit_fifo_usage,
+ [named_table, set, public,
+ {write_concurrency, true}]),
+ Now = erlang:monotonic_time(micro_seconds),
+ #aux{name = Name,
+ utilisation = {inactive, Now, 1, 1.0}}.
+handle_aux(leader, _, garbage_collection, State, Log, _MacState) ->
+ ra_log_wal:force_roll_over(ra_log_wal),
+ {no_reply, State, Log};
+handle_aux(follower, _, garbage_collection, State, Log, MacState) ->
+ ra_log_wal:force_roll_over(ra_log_wal),
+ {no_reply, force_eval_gc(Log, MacState, State), Log};
+handle_aux(_RaState, cast, eval, Aux0, Log, _MacState) ->
+ {no_reply, Aux0, Log};
+handle_aux(_RaState, cast, Cmd, #aux{utilisation = Use0} = Aux0,
+ Log, _MacState)
+ when Cmd == active orelse Cmd == inactive ->
+ {no_reply, Aux0#aux{utilisation = update_use(Use0, Cmd)}, Log};
+handle_aux(_RaState, cast, tick, #aux{name = Name,
+ utilisation = Use0} = State0,
+ Log, MacState) ->
+ true = ets:insert(rabbit_fifo_usage,
+ {Name, utilisation(Use0)}),
+ Aux = eval_gc(Log, MacState, State0),
+ {no_reply, Aux, Log};
+handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0,
+ Log0, MacState) ->
+ case rabbit_fifo:query_peek(Pos, MacState) of
+ {ok, {Idx, {Header, empty}}} ->
+ %% need to re-hydrate from the log
+ {{_, _, {_, _, Cmd, _}}, Log} = ra_log:fetch(Idx, Log0),
+ #enqueue{msg = Msg} = Cmd,
+ {reply, {ok, {Header, Msg}}, Aux0, Log};
+ {ok, {_Idx, {Header, Msg}}} ->
+ {reply, {ok, {Header, Msg}}, Aux0, Log0};
+ Err ->
+ {reply, Err, Aux0, Log0}
+ end.
+eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}} = MacState,
+ #aux{gc = #aux_gc{last_raft_idx = LastGcIdx} = Gc} = AuxState) ->
+ {Idx, _} = ra_log:last_index_term(Log),
+ {memory, Mem} = erlang:process_info(self(), memory),
+ case messages_total(MacState) of
+ 0 when Idx > LastGcIdx andalso
+ Mem > ?GC_MEM_LIMIT_B ->
+ garbage_collect(),
+ {memory, MemAfter} = erlang:process_info(self(), memory),
+ rabbit_log:debug("~s: full GC sweep complete. "
+ "Process memory changed from ~.2fMB to ~.2fMB.",
+ [rabbit_misc:rs(QR), Mem/?MB, MemAfter/?MB]),
+ AuxState#aux{gc = Gc#aux_gc{last_raft_idx = Idx}};
+ _ ->
+ AuxState
+ end.
+force_eval_gc(Log, #?MODULE{cfg = #cfg{resource = QR}},
+ #aux{gc = #aux_gc{last_raft_idx = LastGcIdx} = Gc} = AuxState) ->
+ {Idx, _} = ra_log:last_index_term(Log),
+ {memory, Mem} = erlang:process_info(self(), memory),
+ case Idx > LastGcIdx of
+ true ->
+ garbage_collect(),
+ {memory, MemAfter} = erlang:process_info(self(), memory),
+ rabbit_log:debug("~s: full GC sweep complete. "
+ "Process memory changed from ~.2fMB to ~.2fMB.",
+ [rabbit_misc:rs(QR), Mem/?MB, MemAfter/?MB]),
+ AuxState#aux{gc = Gc#aux_gc{last_raft_idx = Idx}};
+ false ->
+ AuxState
+ end.
+%%% Queries
+query_messages_ready(State) ->
+ messages_ready(State).
+query_messages_checked_out(#?MODULE{consumers = Consumers}) ->
+ maps:fold(fun (_, #consumer{checked_out = C}, S) ->
+ maps:size(C) + S
+ end, 0, Consumers).
+query_messages_total(State) ->
+ messages_total(State).
+query_processes(#?MODULE{enqueuers = Enqs, consumers = Cons0}) ->
+ Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0),
+ maps:keys(maps:merge(Enqs, Cons)).
+query_ra_indexes(#?MODULE{ra_indexes = RaIndexes}) ->
+ RaIndexes.
+query_consumer_count(#?MODULE{consumers = Consumers,
+ waiting_consumers = WaitingConsumers}) ->
+ Up = maps:filter(fun(_ConsumerId, #consumer{status = Status}) ->
+ Status =/= suspected_down
+ end, Consumers),
+ maps:size(Up) + length(WaitingConsumers).
+query_consumers(#?MODULE{consumers = Consumers,
+ waiting_consumers = WaitingConsumers,
+ cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) ->
+ ActiveActivityStatusFun =
+ case ConsumerStrategy of
+ competing ->
+ fun(_ConsumerId,
+ #consumer{status = Status}) ->
+ case Status of
+ suspected_down ->
+ {false, Status};
+ _ ->
+ {true, Status}
+ end
+ end;
+ single_active ->
+ SingleActiveConsumer = query_single_active_consumer(State),
+ fun({Tag, Pid} = _Consumer, _) ->
+ case SingleActiveConsumer of
+ {value, {Tag, Pid}} ->
+ {true, single_active};
+ _ ->
+ {false, waiting}
+ end
+ end
+ end,
+ FromConsumers =
+ maps:fold(fun (_, #consumer{status = cancelled}, Acc) ->
+ Acc;
+ ({Tag, Pid}, #consumer{meta = Meta} = Consumer, Acc) ->
+ {Active, ActivityStatus} =
+ ActiveActivityStatusFun({Tag, Pid}, Consumer),
+ maps:put({Tag, Pid},
+ {Pid, Tag,
+ maps:get(ack, Meta, undefined),
+ maps:get(prefetch, Meta, undefined),
+ Active,
+ ActivityStatus,
+ maps:get(args, Meta, []),
+ maps:get(username, Meta, undefined)},
+ Acc)
+ end, #{}, Consumers),
+ FromWaitingConsumers =
+ lists:foldl(fun ({_, #consumer{status = cancelled}}, Acc) ->
+ Acc;
+ ({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) ->
+ {Active, ActivityStatus} =
+ ActiveActivityStatusFun({Tag, Pid}, Consumer),
+ maps:put({Tag, Pid},
+ {Pid, Tag,
+ maps:get(ack, Meta, undefined),
+ maps:get(prefetch, Meta, undefined),
+ Active,
+ ActivityStatus,
+ maps:get(args, Meta, []),
+ maps:get(username, Meta, undefined)},
+ Acc)
+ end, #{}, WaitingConsumers),
+ maps:merge(FromConsumers, FromWaitingConsumers).
+query_single_active_consumer(#?MODULE{cfg = #cfg{consumer_strategy = single_active},
+ consumers = Consumers}) ->
+ case maps:size(Consumers) of
+ 0 ->
+ {error, no_value};
+ 1 ->
+ {value, lists:nth(1, maps:keys(Consumers))};
+ _
+ ->
+ {error, illegal_size}
+ end ;
+query_single_active_consumer(_) ->
+ disabled.
+query_stat(#?MODULE{consumers = Consumers} = State) ->
+ {messages_ready(State), maps:size(Consumers)}.
+query_in_memory_usage(#?MODULE{msg_bytes_in_memory = Bytes,
+ msgs_ready_in_memory = Length}) ->
+ {Length, Bytes}.
+query_peek(Pos, State0) when Pos > 0 ->
+ case take_next_msg(State0) of
+ empty ->
+ {error, no_message_at_pos};
+ {{_Seq, IdxMsg}, _State}
+ when Pos == 1 ->
+ {ok, IdxMsg};
+ {_Msg, State} ->
+ query_peek(Pos-1, State)
+ end.
+-spec usage(atom()) -> float().
+usage(Name) when is_atom(Name) ->
+ case ets:lookup(rabbit_fifo_usage, Name) of
+ [] -> 0.0;
+ [{_, Use}] -> Use
+ end.
+%%% Internal
+messages_ready(#?MODULE{messages = M,
+ prefix_msgs = {RCnt, _R, PCnt, _P},
+ returns = R}) ->
+ %% prefix messages will rarely have anything in them during normal
+ %% operations so length/1 is fine here
+ lqueue:len(M) + lqueue:len(R) + RCnt + PCnt.
+messages_total(#?MODULE{ra_indexes = I,
+ prefix_msgs = {RCnt, _R, PCnt, _P}}) ->
+ rabbit_fifo_index:size(I) + RCnt + PCnt.
+update_use({inactive, _, _, _} = CUInfo, inactive) ->
+ CUInfo;
+update_use({active, _, _} = CUInfo, active) ->
+ CUInfo;
+update_use({active, Since, Avg}, inactive) ->
+ Now = erlang:monotonic_time(micro_seconds),
+ {inactive, Now, Now - Since, Avg};
+update_use({inactive, Since, Active, Avg}, active) ->
+ Now = erlang:monotonic_time(micro_seconds),
+ {active, Now, use_avg(Active, Now - Since, Avg)}.
+utilisation({active, Since, Avg}) ->
+ use_avg(erlang:monotonic_time(micro_seconds) - Since, 0, Avg);
+utilisation({inactive, Since, Active, Avg}) ->
+ use_avg(Active, erlang:monotonic_time(micro_seconds) - Since, Avg).
+use_avg(0, 0, Avg) ->
+ Avg;
+use_avg(Active, Inactive, Avg) ->
+ Time = Inactive + Active,
+ moving_average(Time, ?USE_AVG_HALF_LIFE, Active / Time, Avg).
+moving_average(_Time, _, Next, undefined) ->
+ Next;
+moving_average(Time, HalfLife, Next, Current) ->
+ Weight = math:exp(Time * math:log(0.5) / HalfLife),
+ Next * (1 - Weight) + Current * Weight.
+num_checked_out(#?MODULE{consumers = Cons}) ->
+ maps:fold(fun (_, #consumer{checked_out = C}, Acc) ->
+ maps:size(C) + Acc
+ end, 0, Cons).
+cancel_consumer(Meta, ConsumerId,
+ #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State,
+ Effects, Reason) ->
+ cancel_consumer0(Meta, ConsumerId, State, Effects, Reason);
+cancel_consumer(Meta, ConsumerId,
+ #?MODULE{cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = []} = State,
+ Effects, Reason) ->
+ %% single active consumer on, no consumers are waiting
+ cancel_consumer0(Meta, ConsumerId, State, Effects, Reason);
+cancel_consumer(Meta, ConsumerId,
+ #?MODULE{consumers = Cons0,
+ cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = Waiting0} = State0,
+ Effects0, Reason) ->
+ %% single active consumer on, consumers are waiting
+ case maps:is_key(ConsumerId, Cons0) of
+ true ->
+ % The active consumer is to be removed
+ {State1, Effects1} = cancel_consumer0(Meta, ConsumerId, State0,
+ Effects0, Reason),
+ activate_next_consumer(State1, Effects1);
+ false ->
+ % The cancelled consumer is not active or cancelled
+ % Just remove it from idle_consumers
+ Waiting = lists:keydelete(ConsumerId, 1, Waiting0),
+ Effects = cancel_consumer_effects(ConsumerId, State0, Effects0),
+ % A waiting consumer isn't supposed to have any checked out messages,
+ % so nothing special to do here
+ {State0#?MODULE{waiting_consumers = Waiting}, Effects}
+ end.
+consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}},
+ ConsumerId, #consumer{meta = Meta},
+ Active, ActivityStatus,
+ Effects) ->
+ Ack = maps:get(ack, Meta, undefined),
+ Prefetch = maps:get(prefetch, Meta, undefined),
+ Args = maps:get(args, Meta, []),
+ [{mod_call, rabbit_quorum_queue, update_consumer_handler,
+ [QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]}
+ | Effects].
+cancel_consumer0(Meta, ConsumerId,
+ #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
+ case C0 of
+ #{ConsumerId := Consumer} ->
+ {S, Effects2} = maybe_return_all(Meta, ConsumerId, Consumer,
+ S0, Effects0, Reason),
+ %% The effects are emitted before the consumer is actually removed
+ %% if the consumer has unacked messages. This is a bit weird but
+ %% in line with what classic queues do (from an external point of
+ %% view)
+ Effects = cancel_consumer_effects(ConsumerId, S, Effects2),
+ case maps:size(S#?MODULE.consumers) of
+ 0 ->
+ {S, [{aux, inactive} | Effects]};
+ _ ->
+ {S, Effects}
+ end;
+ _ ->
+ %% already removed: do nothing
+ {S0, Effects0}
+ end.
+activate_next_consumer(#?MODULE{consumers = Cons,
+ waiting_consumers = Waiting0} = State0,
+ Effects0) ->
+ case maps:filter(fun (_, #consumer{status = S}) -> S == up end, Cons) of
+ Up when map_size(Up) == 0 ->
+ %% there are no active consumer in the consumer map
+ case lists:filter(fun ({_, #consumer{status = Status}}) ->
+ Status == up
+ end, Waiting0) of
+ [{NextConsumerId, NextConsumer} | _] ->
+ %% there is a potential next active consumer
+ Remaining = lists:keydelete(NextConsumerId, 1, Waiting0),
+ #?MODULE{service_queue = ServiceQueue} = State0,
+ ServiceQueue1 = maybe_queue_consumer(NextConsumerId,
+ NextConsumer,
+ ServiceQueue),
+ State = State0#?MODULE{consumers = Cons#{NextConsumerId => NextConsumer},
+ service_queue = ServiceQueue1,
+ waiting_consumers = Remaining},
+ Effects = consumer_update_active_effects(State, NextConsumerId,
+ NextConsumer, true,
+ single_active, Effects0),
+ {State, Effects};
+ [] ->
+ {State0, [{aux, inactive} | Effects0]}
+ end;
+ _ ->
+ {State0, Effects0}
+ end.
+maybe_return_all(#{system_time := Ts} = Meta, ConsumerId, Consumer, S0, Effects0, Reason) ->
+ case Reason of
+ consumer_cancel ->
+ {update_or_remove_sub(Meta, ConsumerId,
+ Consumer#consumer{lifetime = once,
+ credit = 0,
+ status = cancelled},
+ S0), Effects0};
+ down ->
+ {S1, Effects1} = return_all(Meta, S0, Effects0, ConsumerId, Consumer),
+ {S1#?MODULE{consumers = maps:remove(ConsumerId, S1#?MODULE.consumers),
+ last_active = Ts},
+ Effects1}
+ end.
+apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
+ case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
+ {ok, State1, Effects1} ->
+ State2 = append_to_master_index(RaftIdx, State1),
+ {State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
+ {maybe_store_dehydrated_state(RaftIdx, State), ok, Effects};
+ {duplicate, State, Effects} ->
+ {State, ok, Effects}
+ end.
+drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
+ case take_next_msg(State0) of
+ {FullMsg = {_MsgId, {RaftIdxToDrop, {Header, Msg}}},
+ State1} ->
+ Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0),
+ State2 = add_bytes_drop(Header, State1#?MODULE{ra_indexes = Indexes}),
+ State = case Msg of
+ 'empty' -> State2;
+ _ -> subtract_in_memory_counts(Header, State2)
+ end,
+ Effects = dead_letter_effects(maxlen, #{none => FullMsg},
+ State, Effects0),
+ {State, Effects};
+ {{'$prefix_msg', Header}, State1} ->
+ State2 = subtract_in_memory_counts(Header, add_bytes_drop(Header, State1)),
+ {State2, Effects0};
+ {{'$empty_msg', Header}, State1} ->
+ State2 = add_bytes_drop(Header, State1),
+ {State2, Effects0};
+ empty ->
+ {State0, Effects0}
+ end.
+enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
+ next_msg_num = NextMsgNum} = State0) ->
+ %% the initial header is an integer only - it will get expanded to a map
+ %% when the next required key is added
+ Header = message_size(RawMsg),
+ {State1, Msg} =
+ case evaluate_memory_limit(Header, State0) of
+ true ->
+ % indexed message with header map
+ {State0, {RaftIdx, {Header, 'empty'}}};
+ false ->
+ {add_in_memory_counts(Header, State0),
+ {RaftIdx, {Header, RawMsg}}} % indexed message with header map
+ end,
+ State = add_bytes_enqueue(Header, State1),
+ State#?MODULE{messages = lqueue:in({NextMsgNum, Msg}, Messages),
+ next_msg_num = NextMsgNum + 1}.
+ #?MODULE{ra_indexes = Indexes0} = State0) ->
+ State = incr_enqueue_count(State0),
+ Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0),
+ State#?MODULE{ra_indexes = Indexes}.
+incr_enqueue_count(#?MODULE{enqueue_count = EC,
+ cfg = #cfg{release_cursor_interval = {_Base, C}}
+ } = State0) when EC >= C->
+ %% this will trigger a dehydrated version of the state to be stored
+ %% at this raft index for potential future snapshot generation
+ %% Q: Why don't we just stash the release cursor here?
+ %% A: Because it needs to be the very last thing we do and we
+ %% first needs to run the checkout logic.
+ State0#?MODULE{enqueue_count = 0};
+incr_enqueue_count(#?MODULE{enqueue_count = C} = State) ->
+ State#?MODULE{enqueue_count = C + 1}.
+ #?MODULE{cfg =
+ #cfg{release_cursor_interval = {Base, _}}
+ = Cfg,
+ ra_indexes = Indexes,
+ enqueue_count = 0,
+ release_cursors = Cursors0} = State0) ->
+ case rabbit_fifo_index:exists(RaftIdx, Indexes) of
+ false ->
+ %% the incoming enqueue must already have been dropped
+ State0;
+ true ->
+ Interval = case Base of
+ 0 -> 0;
+ _ ->
+ Total = messages_total(State0),
+ min(max(Total, Base), ?RELEASE_CURSOR_EVERY_MAX)
+ end,
+ State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval =
+ {Base, Interval}}},
+ Dehydrated = dehydrate_state(State),
+ Cursor = {release_cursor, RaftIdx, Dehydrated},
+ Cursors = lqueue:in(Cursor, Cursors0),
+ State#?MODULE{release_cursors = Cursors}
+ end;
+maybe_store_dehydrated_state(_RaftIdx, State) ->
+ State.
+ #enqueuer{next_seqno = Next,
+ pending = [{Next, RaftIdx, RawMsg} | Pending]} = Enq0,
+ State0) ->
+ State = enqueue(RaftIdx, RawMsg, State0),
+ Enq = Enq0#enqueuer{next_seqno = Next + 1, pending = Pending},
+ enqueue_pending(From, Enq, State);
+enqueue_pending(From, Enq, #?MODULE{enqueuers = Enqueuers0} = State) ->
+ State#?MODULE{enqueuers = Enqueuers0#{From => Enq}}.
+maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) ->
+ % direct enqueue without tracking
+ State = enqueue(RaftIdx, RawMsg, State0),
+ {ok, State, Effects};
+maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
+ #?MODULE{enqueuers = Enqueuers0} = State0) ->
+ case maps:get(From, Enqueuers0, undefined) of
+ undefined ->
+ State1 = State0#?MODULE{enqueuers = Enqueuers0#{From => #enqueuer{}}},
+ {ok, State, Effects} = maybe_enqueue(RaftIdx, From, MsgSeqNo,
+ RawMsg, Effects0, State1),
+ {ok, State, [{monitor, process, From} | Effects]};
+ #enqueuer{next_seqno = MsgSeqNo} = Enq0 ->
+ % it is the next expected seqno
+ State1 = enqueue(RaftIdx, RawMsg, State0),
+ Enq = Enq0#enqueuer{next_seqno = MsgSeqNo + 1},
+ State = enqueue_pending(From, Enq, State1),
+ {ok, State, Effects0};
+ #enqueuer{next_seqno = Next,
+ pending = Pending0} = Enq0
+ when MsgSeqNo > Next ->
+ % out of order delivery
+ Pending = [{MsgSeqNo, RaftIdx, RawMsg} | Pending0],
+ Enq = Enq0#enqueuer{pending = lists:sort(Pending)},
+ {ok, State0#?MODULE{enqueuers = Enqueuers0#{From => Enq}}, Effects0};
+ #enqueuer{next_seqno = Next} when MsgSeqNo =< Next ->
+ % duplicate delivery - remove the raft index from the ra_indexes
+ % map as it was added earlier
+ {duplicate, State0, Effects0}
+ end.
+snd(T) ->
+ element(2, T).
+return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned,
+ Effects0, State0) ->
+ {State1, Effects1} = maps:fold(
+ fun(MsgId, {Tag, _} = Msg, {S0, E0})
+ when Tag == '$prefix_msg';
+ Tag == '$empty_msg'->
+ return_one(Meta, MsgId, 0, Msg, S0, E0, ConsumerId);
+ (MsgId, {MsgNum, Msg}, {S0, E0}) ->
+ return_one(Meta, MsgId, MsgNum, Msg, S0, E0,
+ ConsumerId)
+ end, {State0, Effects0}, Returned),
+ State2 =
+ case State1#?MODULE.consumers of
+ #{ConsumerId := Con0} ->
+ Con = Con0#consumer{credit = increase_credit(Con0,
+ map_size(Returned))},
+ update_or_remove_sub(Meta, ConsumerId, Con, State1);
+ _ ->
+ State1
+ end,
+ {State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
+ update_smallest_raft_index(IncomingRaftIdx, State, Effects).
+% used to processes messages that are finished
+complete(Meta, ConsumerId, Discarded,
+ #consumer{checked_out = Checked} = Con0, Effects,
+ #?MODULE{ra_indexes = Indexes0} = State0) ->
+ %% TODO optimise use of Discarded map here
+ MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
+ %% credit_mode = simple_prefetch should automatically top-up credit
+ %% as messages are simple_prefetch or otherwise returned
+ Con = Con0#consumer{checked_out = maps:without(maps:keys(Discarded), Checked),
+ credit = increase_credit(Con0, map_size(Discarded))},
+ State1 = update_or_remove_sub(Meta, ConsumerId, Con, State0),
+ Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
+ MsgRaftIdxs),
+ %% TODO: use maps:fold instead
+ State2 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) ->
+ add_bytes_settle(Header, Acc);
+ ({'$prefix_msg', Header}, Acc) ->
+ add_bytes_settle(Header, Acc);
+ ({'$empty_msg', Header}, Acc) ->
+ add_bytes_settle(Header, Acc)
+ end, State1, maps:values(Discarded)),
+ {State2#?MODULE{ra_indexes = Indexes}, Effects}.
+increase_credit(#consumer{lifetime = once,
+ credit = Credit}, _) ->
+ %% once consumers cannot increment credit
+ Credit;
+increase_credit(#consumer{lifetime = auto,
+ credit_mode = credited,
+ credit = Credit}, _) ->
+ %% credit_mode: credit also doesn't automatically increment credit
+ Credit;
+increase_credit(#consumer{credit = Current}, Credit) ->
+ Current + Credit.
+complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
+ #consumer{checked_out = Checked0} = Con0,
+ Effects0, State0) ->
+ Discarded = maps:with(MsgIds, Checked0),
+ {State2, Effects1} = complete(Meta, ConsumerId, Discarded, Con0,
+ Effects0, State0),
+ {State, ok, Effects} = checkout(Meta, State0, State2, Effects1),
+ update_smallest_raft_index(IncomingRaftIdx, State, Effects).
+dead_letter_effects(_Reason, _Discarded,
+ #?MODULE{cfg = #cfg{dead_letter_handler = undefined}},
+ Effects) ->
+ Effects;
+dead_letter_effects(Reason, Discarded,
+ #?MODULE{cfg = #cfg{dead_letter_handler = {Mod, Fun, Args}}},
+ Effects) ->
+ RaftIdxs = maps:fold(
+ fun (_, {_, {RaftIdx, {_Header, 'empty'}}}, Acc) ->
+ [RaftIdx | Acc];
+ (_, _, Acc) ->
+ Acc
+ end, [], Discarded),
+ [{log, RaftIdxs,
+ fun (Log) ->
+ Lookup = maps:from_list(lists:zip(RaftIdxs, Log)),
+ DeadLetters = maps:fold(
+ fun (_, {_, {RaftIdx, {_Header, 'empty'}}}, Acc) ->
+ {enqueue, _, _, Msg} = maps:get(RaftIdx, Lookup),
+ [{Reason, Msg} | Acc];
+ (_, {_, {_, {_Header, Msg}}}, Acc) ->
+ [{Reason, Msg} | Acc];
+ (_, _, Acc) ->
+ Acc
+ end, [], Discarded),
+ [{mod_call, Mod, Fun, Args ++ [DeadLetters]}]
+ end} | Effects].
+ #?MODULE{cfg = #cfg{resource = QName}}, Effects) ->
+ [{mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [QName, ConsumerId]} | Effects].
+update_smallest_raft_index(Idx, State, Effects) ->
+ update_smallest_raft_index(Idx, ok, State, Effects).
+update_smallest_raft_index(IncomingRaftIdx, Reply,
+ #?MODULE{cfg = Cfg,
+ ra_indexes = Indexes,
+ release_cursors = Cursors0} = State0,
+ Effects) ->
+ case rabbit_fifo_index:size(Indexes) of
+ 0 ->
+ % there are no messages on queue anymore and no pending enqueues
+ % we can forward release_cursor all the way until
+ % the last received command, hooray
+ %% reset the release cursor interval
+ #cfg{release_cursor_interval = {Base, _}} = Cfg,
+ RCI = {Base, Base},
+ State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = RCI},
+ release_cursors = lqueue:new(),
+ enqueue_count = 0},
+ {State, Reply, Effects ++ [{release_cursor, IncomingRaftIdx, State}]};
+ _ ->
+ Smallest = rabbit_fifo_index:smallest(Indexes),
+ case find_next_cursor(Smallest, Cursors0) of
+ {empty, Cursors} ->
+ {State0#?MODULE{release_cursors = Cursors}, Reply, Effects};
+ {Cursor, Cursors} ->
+ %% we can emit a release cursor when we've passed the smallest
+ %% release cursor available.
+ {State0#?MODULE{release_cursors = Cursors}, Reply,
+ Effects ++ [Cursor]}
+ end
+ end.
+find_next_cursor(Idx, Cursors) ->
+ find_next_cursor(Idx, Cursors, empty).
+find_next_cursor(Smallest, Cursors0, Potential) ->
+ case lqueue:out(Cursors0) of
+ {{value, {_, Idx, _} = Cursor}, Cursors} when Idx < Smallest ->
+ %% we found one but it may not be the largest one
+ find_next_cursor(Smallest, Cursors, Cursor);
+ _ ->
+ {Potential, Cursors0}
+ end.
+update_header(Key, UpdateFun, Default, Header)
+ when is_integer(Header) ->
+ update_header(Key, UpdateFun, Default, #{size => Header});
+update_header(Key, UpdateFun, Default, Header) ->
+ maps:update_with(Key, UpdateFun, Default, Header).
+return_one(Meta, MsgId, 0, {Tag, Header0},
+ #?MODULE{returns = Returns,
+ consumers = Consumers,
+ cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
+ Effects0, ConsumerId)
+ when Tag == '$prefix_msg'; Tag == '$empty_msg' ->
+ #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
+ Header = update_header(delivery_count, fun (C) -> C+1 end, 1, Header0),
+ Msg0 = {Tag, Header},
+ case maps:get(delivery_count, Header) of
+ DeliveryCount when DeliveryCount > DeliveryLimit ->
+ complete(Meta, ConsumerId, #{MsgId => Msg0}, Con0, Effects0, State0);
+ _ ->
+ %% this should not affect the release cursor in any way
+ Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
+ {Msg, State1} = case Tag of
+ '$empty_msg' ->
+ {Msg0, State0};
+ _ -> case evaluate_memory_limit(Header, State0) of
+ true ->
+ {{'$empty_msg', Header}, State0};
+ false ->
+ {Msg0, add_in_memory_counts(Header, State0)}
+ end
+ end,
+ {add_bytes_return(
+ Header,
+ State1#?MODULE{consumers = Consumers#{ConsumerId => Con},
+ returns = lqueue:in(Msg, Returns)}),
+ Effects0}
+ end;
+return_one(Meta, MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
+ #?MODULE{returns = Returns,
+ consumers = Consumers,
+ cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
+ Effects0, ConsumerId) ->
+ #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
+ Header = update_header(delivery_count, fun (C) -> C+1 end, 1, Header0),
+ Msg0 = {RaftId, {Header, RawMsg}},
+ case maps:get(delivery_count, Header) of
+ DeliveryCount when DeliveryCount > DeliveryLimit ->
+ DlMsg = {MsgNum, Msg0},
+ Effects = dead_letter_effects(delivery_limit, #{none => DlMsg},
+ State0, Effects0),
+ complete(Meta, ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0);
+ _ ->
+ Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
+ %% this should not affect the release cursor in any way
+ {Msg, State1} = case RawMsg of
+ 'empty' ->
+ {Msg0, State0};
+ _ ->
+ case evaluate_memory_limit(Header, State0) of
+ true ->
+ {{RaftId, {Header, 'empty'}}, State0};
+ false ->
+ {Msg0, add_in_memory_counts(Header, State0)}
+ end
+ end,
+ {add_bytes_return(
+ Header,
+ State1#?MODULE{consumers = Consumers#{ConsumerId => Con},
+ returns = lqueue:in({MsgNum, Msg}, Returns)}),
+ Effects0}
+ end.
+return_all(Meta, #?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
+ #consumer{checked_out = Checked0} = Con) ->
+ %% need to sort the list so that we return messages in the order
+ %% they were checked out
+ Checked = lists:sort(maps:to_list(Checked0)),
+ State = State0#?MODULE{consumers = Cons#{ConsumerId => Con}},
+ lists:foldl(fun ({MsgId, {'$prefix_msg', _} = Msg}, {S, E}) ->
+ return_one(Meta, MsgId, 0, Msg, S, E, ConsumerId);
+ ({MsgId, {'$empty_msg', _} = Msg}, {S, E}) ->
+ return_one(Meta, MsgId, 0, Msg, S, E, ConsumerId);
+ ({MsgId, {MsgNum, Msg}}, {S, E}) ->
+ return_one(Meta, MsgId, MsgNum, Msg, S, E, ConsumerId)
+ end, {State, Effects0}, Checked).
+%% checkout new messages to consumers
+checkout(#{index := Index} = Meta, OldState, State0, Effects0) ->
+ {State1, _Result, Effects1} = checkout0(Meta, checkout_one(Meta, State0),
+ Effects0, {#{}, #{}}),
+ case evaluate_limit(Index, false, OldState, State1, Effects1) of
+ {State, true, Effects} ->
+ update_smallest_raft_index(Index, State, Effects);
+ {State, false, Effects} ->
+ {State, ok, Effects}
+ end.
+checkout0(Meta, {success, ConsumerId, MsgId, {RaftIdx, {Header, 'empty'}}, State},
+ Effects, {SendAcc, LogAcc0}) ->
+ DelMsg = {RaftIdx, {MsgId, Header}},
+ LogAcc = maps:update_with(ConsumerId,
+ fun (M) -> [DelMsg | M] end,
+ [DelMsg], LogAcc0),
+ checkout0(Meta, checkout_one(Meta, State), Effects, {SendAcc, LogAcc});
+checkout0(Meta, {success, ConsumerId, MsgId, Msg, State}, Effects,
+ {SendAcc0, LogAcc}) ->
+ DelMsg = {MsgId, Msg},
+ SendAcc = maps:update_with(ConsumerId,
+ fun (M) -> [DelMsg | M] end,
+ [DelMsg], SendAcc0),
+ checkout0(Meta, checkout_one(Meta, State), Effects, {SendAcc, LogAcc});
+checkout0(_Meta, {Activity, State0}, Effects0, {SendAcc, LogAcc}) ->
+ Effects1 = case Activity of
+ nochange ->
+ append_send_msg_effects(
+ append_log_effects(Effects0, LogAcc), SendAcc);
+ inactive ->
+ [{aux, inactive}
+ | append_send_msg_effects(
+ append_log_effects(Effects0, LogAcc), SendAcc)]
+ end,
+ {State0, ok, lists:reverse(Effects1)}.
+evaluate_limit(_Index, Result, _BeforeState,
+ #?MODULE{cfg = #cfg{max_length = undefined,
+ max_bytes = undefined}} = State,
+ Effects) ->
+ {State, Result, Effects};
+evaluate_limit(Index, Result, BeforeState,
+ #?MODULE{cfg = #cfg{overflow_strategy = Strategy},
+ enqueuers = Enqs0} = State0,
+ Effects0) ->
+ case is_over_limit(State0) of
+ true when Strategy == drop_head ->
+ {State, Effects} = drop_head(State0, Effects0),
+ evaluate_limit(Index, true, BeforeState, State, Effects);
+ true when Strategy == reject_publish ->
+ %% generate send_msg effect for each enqueuer to let them know
+ %% they need to block
+ {Enqs, Effects} =
+ maps:fold(
+ fun (P, #enqueuer{blocked = undefined} = E0, {Enqs, Acc}) ->
+ E = E0#enqueuer{blocked = Index},
+ {Enqs#{P => E},
+ [{send_msg, P, {queue_status, reject_publish},
+ [ra_event]} | Acc]};
+ (_P, _E, Acc) ->
+ Acc
+ end, {Enqs0, Effects0}, Enqs0),
+ {State0#?MODULE{enqueuers = Enqs}, Result, Effects};
+ false when Strategy == reject_publish ->
+ %% TODO: optimise as this case gets called for every command
+ %% pretty much
+ Before = is_below_soft_limit(BeforeState),
+ case {Before, is_below_soft_limit(State0)} of
+ {false, true} ->
+ %% we have moved below the lower limit which
+ {Enqs, Effects} =
+ maps:fold(
+ fun (P, #enqueuer{} = E0, {Enqs, Acc}) ->
+ E = E0#enqueuer{blocked = undefined},
+ {Enqs#{P => E},
+ [{send_msg, P, {queue_status, go}, [ra_event]}
+ | Acc]};
+ (_P, _E, Acc) ->
+ Acc
+ end, {Enqs0, Effects0}, Enqs0),
+ {State0#?MODULE{enqueuers = Enqs}, Result, Effects};
+ _ ->
+ {State0, Result, Effects0}
+ end;
+ false ->
+ {State0, Result, Effects0}
+ end.
+ #?MODULE{cfg = #cfg{max_in_memory_length = undefined,
+ max_in_memory_bytes = undefined}}) ->
+ false;
+evaluate_memory_limit(#{size := Size}, State) ->
+ evaluate_memory_limit(Size, State);
+ #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength,
+ max_in_memory_bytes = MaxBytes},
+ msg_bytes_in_memory = Bytes,
+ msgs_ready_in_memory = Length})
+ when is_integer(Size) ->
+ (Length >= MaxLength) orelse ((Bytes + Size) > MaxBytes).
+append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 ->
+ Effects;
+append_send_msg_effects(Effects0, AccMap) ->
+ Effects = maps:fold(fun (C, Msgs, Ef) ->
+ [send_msg_effect(C, lists:reverse(Msgs)) | Ef]
+ end, Effects0, AccMap),
+ [{aux, active} | Effects].
+append_log_effects(Effects0, AccMap) ->
+ maps:fold(fun (C, Msgs, Ef) ->
+ [send_log_effect(C, lists:reverse(Msgs)) | Ef]
+ end, Effects0, AccMap).
+%% next message is determined as follows:
+%% First we check if there are are prefex returns
+%% Then we check if there are current returns
+%% then we check prefix msgs
+%% then we check current messages
+%% When we return it is always done to the current return queue
+%% for both prefix messages and current messages
+take_next_msg(#?MODULE{prefix_msgs = {R, P}} = State) ->
+ %% conversion
+ take_next_msg(State#?MODULE{prefix_msgs = {length(R), R, length(P), P}});
+take_next_msg(#?MODULE{prefix_msgs = {NumR, [{'$empty_msg', _} = Msg | Rem],
+ NumP, P}} = State) ->
+ %% there are prefix returns, these should be served first
+ {Msg, State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}};
+take_next_msg(#?MODULE{prefix_msgs = {NumR, [Header | Rem], NumP, P}} = State) ->
+ %% there are prefix returns, these should be served first
+ {{'$prefix_msg', Header},
+ State#?MODULE{prefix_msgs = {NumR-1, Rem, NumP, P}}};
+take_next_msg(#?MODULE{returns = Returns,
+ messages = Messages0,
+ prefix_msgs = {NumR, R, NumP, P}} = State) ->
+ %% use peek rather than out there as the most likely case is an empty
+ %% queue
+ case lqueue:peek(Returns) of
+ {value, NextMsg} ->
+ {NextMsg,
+ State#?MODULE{returns = lqueue:drop(Returns)}};
+ empty when P == [] ->
+ case lqueue:out(Messages0) of
+ {empty, _} ->
+ empty;
+ {{value, {_, _} = SeqMsg}, Messages} ->
+ {SeqMsg, State#?MODULE{messages = Messages }}
+ end;
+ empty ->
+ [Msg | Rem] = P,
+ case Msg of
+ {Header, 'empty'} ->
+ %% There are prefix msgs
+ {{'$empty_msg', Header},
+ State#?MODULE{prefix_msgs = {NumR, R, NumP-1, Rem}}};
+ Header ->
+ {{'$prefix_msg', Header},
+ State#?MODULE{prefix_msgs = {NumR, R, NumP-1, Rem}}}
+ end
+ end.
+send_msg_effect({CTag, CPid}, Msgs) ->
+ {send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}.
+send_log_effect({CTag, CPid}, IdxMsgs) ->
+ {RaftIdxs, Data} = lists:unzip(IdxMsgs),
+ {log, RaftIdxs,
+ fun(Log) ->
+ Msgs = lists:zipwith(fun ({enqueue, _, _, Msg}, {MsgId, Header}) ->
+ {MsgId, {Header, Msg}}
+ end, Log, Data),
+ [{send_msg, CPid, {delivery, CTag, Msgs}, [local, ra_event]}]
+ end,
+ {local, node(CPid)}}.
+reply_log_effect(RaftIdx, MsgId, Header, Ready, From) ->
+ {log, [RaftIdx],
+ fun([{enqueue, _, _, Msg}]) ->
+ [{reply, From, {wrap_reply,
+ {dequeue, {MsgId, {Header, Msg}}, Ready}}}]
+ end}.
+checkout_one(Meta, #?MODULE{service_queue = SQ0,
+ messages = Messages0,
+ consumers = Cons0} = InitState) ->
+ case priority_queue:out(SQ0) of
+ {{value, ConsumerId}, SQ1} ->
+ case take_next_msg(InitState) of
+ {ConsumerMsg, State0} ->
+ %% there are consumers waiting to be serviced
+ %% process consumer checkout
+ case maps:find(ConsumerId, Cons0) of
+ {ok, #consumer{credit = 0}} ->
+ %% no credit but was still on queue
+ %% can happen when draining
+ %% recurse without consumer on queue
+ checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
+ {ok, #consumer{status = cancelled}} ->
+ checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
+ {ok, #consumer{status = suspected_down}} ->
+ checkout_one(Meta, InitState#?MODULE{service_queue = SQ1});
+ {ok, #consumer{checked_out = Checked0,
+ next_msg_id = Next,
+ credit = Credit,
+ delivery_count = DelCnt} = Con0} ->
+ Checked = maps:put(Next, ConsumerMsg, Checked0),
+ Con = Con0#consumer{checked_out = Checked,
+ next_msg_id = Next + 1,
+ credit = Credit - 1,
+ delivery_count = DelCnt + 1},
+ State1 = update_or_remove_sub(Meta,
+ ConsumerId, Con,
+ State0#?MODULE{service_queue = SQ1}),
+ {State, Msg} =
+ case ConsumerMsg of
+ {'$prefix_msg', Header} ->
+ {subtract_in_memory_counts(
+ Header, add_bytes_checkout(Header, State1)),
+ ConsumerMsg};
+ {'$empty_msg', Header} ->
+ {add_bytes_checkout(Header, State1),
+ ConsumerMsg};
+ {_, {_, {Header, 'empty'}} = M} ->
+ {add_bytes_checkout(Header, State1),
+ M};
+ {_, {_, {Header, _} = M}} ->
+ {subtract_in_memory_counts(
+ Header,
+ add_bytes_checkout(Header, State1)),
+ M}
+ end,
+ {success, ConsumerId, Next, Msg, State};
+ error ->
+ %% consumer did not exist but was queued, recurse
+ checkout_one(Meta, InitState#?MODULE{service_queue = SQ1})
+ end;
+ empty ->
+ {nochange, InitState}
+ end;
+ {empty, _} ->
+ case lqueue:len(Messages0) of
+ 0 -> {nochange, InitState};
+ _ -> {inactive, InitState}
+ end
+ end.
+update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = auto,
+ credit = 0} = Con,
+ #?MODULE{consumers = Cons} = State) ->
+ State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons)};
+update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = auto} = Con,
+ #?MODULE{consumers = Cons,
+ service_queue = ServiceQueue} = State) ->
+ State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons),
+ service_queue = uniq_queue_in(ConsumerId, Con, ServiceQueue)};
+update_or_remove_sub(#{system_time := Ts},
+ ConsumerId, #consumer{lifetime = once,
+ checked_out = Checked,
+ credit = 0} = Con,
+ #?MODULE{consumers = Cons} = State) ->
+ case maps:size(Checked) of
+ 0 ->
+ % we're done with this consumer
+ State#?MODULE{consumers = maps:remove(ConsumerId, Cons),
+ last_active = Ts};
+ _ ->
+ % there are unsettled items so need to keep around
+ State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons)}
+ end;
+update_or_remove_sub(_Meta, ConsumerId, #consumer{lifetime = once} = Con,
+ #?MODULE{consumers = Cons,
+ service_queue = ServiceQueue} = State) ->
+ State#?MODULE{consumers = maps:put(ConsumerId, Con, Cons),
+ service_queue = uniq_queue_in(ConsumerId, Con, ServiceQueue)}.
+uniq_queue_in(Key, #consumer{priority = P}, Queue) ->
+ % TODO: queue:member could surely be quite expensive, however the practical
+ % number of unique consumers may not be large enough for it to matter
+ case priority_queue:member(Key, Queue) of
+ true ->
+ Queue;
+ false ->
+ priority_queue:in(Key, P, Queue)
+ end.
+update_consumer(ConsumerId, Meta, Spec, Priority,
+ #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State0) ->
+ %% general case, single active consumer off
+ update_consumer0(ConsumerId, Meta, Spec, Priority, State0);
+update_consumer(ConsumerId, Meta, Spec, Priority,
+ #?MODULE{consumers = Cons0,
+ cfg = #cfg{consumer_strategy = single_active}} = State0)
+ when map_size(Cons0) == 0 ->
+ %% single active consumer on, no one is consuming yet
+ update_consumer0(ConsumerId, Meta, Spec, Priority, State0);
+update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, Priority,
+ #?MODULE{cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = WaitingConsumers0} = State0) ->
+ %% single active consumer on and one active consumer already
+ %% adding the new consumer to the waiting list
+ Consumer = #consumer{lifetime = Life, meta = Meta,
+ priority = Priority,
+ credit = Credit, credit_mode = Mode},
+ WaitingConsumers1 = WaitingConsumers0 ++ [{ConsumerId, Consumer}],
+ State0#?MODULE{waiting_consumers = WaitingConsumers1}.
+update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, Priority,
+ #?MODULE{consumers = Cons0,
+ service_queue = ServiceQueue0} = State0) ->
+ %% TODO: this logic may not be correct for updating a pre-existing consumer
+ Init = #consumer{lifetime = Life, meta = Meta,
+ priority = Priority,
+ credit = Credit, credit_mode = Mode},
+ Cons = maps:update_with(ConsumerId,
+ fun(S) ->
+ %% remove any in-flight messages from
+ %% the credit update
+ N = maps:size(S#consumer.checked_out),
+ C = max(0, Credit - N),
+ S#consumer{lifetime = Life, credit = C}
+ end, Init, Cons0),
+ ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons),
+ ServiceQueue0),
+ State0#?MODULE{consumers = Cons, service_queue = ServiceQueue}.
+maybe_queue_consumer(ConsumerId, #consumer{credit = Credit} = Con,
+ ServiceQueue0) ->
+ case Credit > 0 of
+ true ->
+ % consumerect needs service - check if already on service queue
+ uniq_queue_in(ConsumerId, Con, ServiceQueue0);
+ false ->
+ ServiceQueue0
+ end.
+%% creates a dehydrated version of the current state to be cached and
+%% potentially used to for a snaphot at a later point
+dehydrate_state(#?MODULE{messages = Messages,
+ consumers = Consumers,
+ returns = Returns,
+ prefix_msgs = {PRCnt, PrefRet0, PPCnt, PrefMsg0},
+ waiting_consumers = Waiting0} = State) ->
+ RCnt = lqueue:len(Returns),
+ %% TODO: optimise this function as far as possible
+ PrefRet1 = lists:foldr(fun ({'$prefix_msg', Header}, Acc) ->
+ [Header | Acc];
+ ({'$empty_msg', _} = Msg, Acc) ->
+ [Msg | Acc];
+ ({_, {_, {Header, 'empty'}}}, Acc) ->
+ [{'$empty_msg', Header} | Acc];
+ ({_, {_, {Header, _}}}, Acc) ->
+ [Header | Acc]
+ end,
+ [],
+ lqueue:to_list(Returns)),
+ PrefRet = PrefRet0 ++ PrefRet1,
+ PrefMsgsSuff = dehydrate_messages(Messages, []),
+ %% prefix messages are not populated in normal operation only after
+ %% recovering from a snapshot
+ PrefMsgs = PrefMsg0 ++ PrefMsgsSuff,
+ Waiting = [{Cid, dehydrate_consumer(C)} || {Cid, C} <- Waiting0],
+ State#?MODULE{messages = lqueue:new(),
+ ra_indexes = rabbit_fifo_index:empty(),
+ release_cursors = lqueue:new(),
+ consumers = maps:map(fun (_, C) ->
+ dehydrate_consumer(C)
+ end, Consumers),
+ returns = lqueue:new(),
+ prefix_msgs = {PRCnt + RCnt, PrefRet,
+ PPCnt + lqueue:len(Messages), PrefMsgs},
+ waiting_consumers = Waiting}.
+%% TODO make body recursive to avoid allocating lists:reverse call
+dehydrate_messages(Msgs0, Acc0) ->
+ {OutRes, Msgs} = lqueue:out(Msgs0),
+ case OutRes of
+ {value, {_MsgId, {_RaftId, {_, 'empty'} = Msg}}} ->
+ dehydrate_messages(Msgs, [Msg | Acc0]);
+ {value, {_MsgId, {_RaftId, {Header, _}}}} ->
+ dehydrate_messages(Msgs, [Header | Acc0]);
+ empty ->
+ lists:reverse(Acc0)
+ end.
+dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
+ Checked = maps:map(fun (_, {'$prefix_msg', _} = M) ->
+ M;
+ (_, {'$empty_msg', _} = M) ->
+ M;
+ (_, {_, {_, {Header, 'empty'}}}) ->
+ {'$empty_msg', Header};
+ (_, {_, {_, {Header, _}}}) ->
+ {'$prefix_msg', Header}
+ end, Checked0),
+ Con#consumer{checked_out = Checked}.
+%% make the state suitable for equality comparison
+normalize(#?MODULE{messages = Messages,
+ release_cursors = Cursors} = State) ->
+ State#?MODULE{messages = lqueue:from_list(lqueue:to_list(Messages)),
+ release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}.
+is_over_limit(#?MODULE{cfg = #cfg{max_length = undefined,
+ max_bytes = undefined}}) ->
+ false;
+is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength,
+ max_bytes = MaxBytes},
+ msg_bytes_enqueue = BytesEnq} = State) ->
+ messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes).
+is_below_soft_limit(#?MODULE{cfg = #cfg{max_length = undefined,
+ max_bytes = undefined}}) ->
+ false;
+is_below_soft_limit(#?MODULE{cfg = #cfg{max_length = MaxLength,
+ max_bytes = MaxBytes},
+ msg_bytes_enqueue = BytesEnq} = State) ->
+ is_below(MaxLength, messages_ready(State)) andalso
+ is_below(MaxBytes, BytesEnq).
+is_below(undefined, _Num) ->
+ true;
+is_below(Val, Num) when is_integer(Val) andalso is_integer(Num) ->
+ Num =< trunc(Val * ?LOW_LIMIT).
+-spec make_enqueue(option(pid()), option(msg_seqno()), raw_msg()) -> protocol().
+make_enqueue(Pid, Seq, Msg) ->
+ #enqueue{pid = Pid, seq = Seq, msg = Msg}.
+-spec make_register_enqueuer(pid()) -> protocol().
+make_register_enqueuer(Pid) ->
+ #register_enqueuer{pid = Pid}.
+-spec make_checkout(consumer_id(),
+ checkout_spec(), consumer_meta()) -> protocol().
+make_checkout(ConsumerId, Spec, Meta) ->
+ #checkout{consumer_id = ConsumerId,
+ spec = Spec, meta = Meta}.
+-spec make_settle(consumer_id(), [msg_id()]) -> protocol().
+make_settle(ConsumerId, MsgIds) when is_list(MsgIds) ->
+ #settle{consumer_id = ConsumerId, msg_ids = MsgIds}.
+-spec make_return(consumer_id(), [msg_id()]) -> protocol().
+make_return(ConsumerId, MsgIds) ->
+ #return{consumer_id = ConsumerId, msg_ids = MsgIds}.
+-spec make_discard(consumer_id(), [msg_id()]) -> protocol().
+make_discard(ConsumerId, MsgIds) ->
+ #discard{consumer_id = ConsumerId, msg_ids = MsgIds}.
+-spec make_credit(consumer_id(), non_neg_integer(), non_neg_integer(),
+ boolean()) -> protocol().
+make_credit(ConsumerId, Credit, DeliveryCount, Drain) ->
+ #credit{consumer_id = ConsumerId,
+ credit = Credit,
+ delivery_count = DeliveryCount,
+ drain = Drain}.
+-spec make_purge() -> protocol().
+make_purge() -> #purge{}.
+-spec make_garbage_collection() -> protocol().
+make_garbage_collection() -> #garbage_collection{}.
+-spec make_purge_nodes([node()]) -> protocol().
+make_purge_nodes(Nodes) ->
+ #purge_nodes{nodes = Nodes}.
+-spec make_update_config(config()) -> protocol().
+make_update_config(Config) ->
+ #update_config{config = Config}.
+ #?MODULE{msg_bytes_enqueue = Enqueue} = State)
+ when is_integer(Bytes) ->
+ State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes};
+add_bytes_enqueue(#{size := Bytes}, State) ->
+ add_bytes_enqueue(Bytes, State).
+ #?MODULE{msg_bytes_enqueue = Enqueue} = State)
+ when is_integer(Bytes) ->
+ State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes};
+add_bytes_drop(#{size := Bytes}, State) ->
+ add_bytes_drop(Bytes, State).
+ #?MODULE{msg_bytes_checkout = Checkout,
+ msg_bytes_enqueue = Enqueue } = State)
+ when is_integer(Bytes) ->
+ State#?MODULE{msg_bytes_checkout = Checkout + Bytes,
+ msg_bytes_enqueue = Enqueue - Bytes};
+add_bytes_checkout(#{size := Bytes}, State) ->
+ add_bytes_checkout(Bytes, State).
+ #?MODULE{msg_bytes_checkout = Checkout} = State)
+ when is_integer(Bytes) ->
+ State#?MODULE{msg_bytes_checkout = Checkout - Bytes};
+add_bytes_settle(#{size := Bytes}, State) ->
+ add_bytes_settle(Bytes, State).
+ #?MODULE{msg_bytes_checkout = Checkout,
+ msg_bytes_enqueue = Enqueue} = State)
+ when is_integer(Bytes) ->
+ State#?MODULE{msg_bytes_checkout = Checkout - Bytes,
+ msg_bytes_enqueue = Enqueue + Bytes};
+add_bytes_return(#{size := Bytes}, State) ->
+ add_bytes_return(Bytes, State).
+ #?MODULE{msg_bytes_in_memory = InMemoryBytes,
+ msgs_ready_in_memory = InMemoryCount} = State)
+ when is_integer(Bytes) ->
+ State#?MODULE{msg_bytes_in_memory = InMemoryBytes + Bytes,
+ msgs_ready_in_memory = InMemoryCount + 1};
+add_in_memory_counts(#{size := Bytes}, State) ->
+ add_in_memory_counts(Bytes, State).
+ #?MODULE{msg_bytes_in_memory = InMemoryBytes,
+ msgs_ready_in_memory = InMemoryCount} = State)
+ when is_integer(Bytes) ->
+ State#?MODULE{msg_bytes_in_memory = InMemoryBytes - Bytes,
+ msgs_ready_in_memory = InMemoryCount - 1};
+subtract_in_memory_counts(#{size := Bytes}, State) ->
+ subtract_in_memory_counts(Bytes, State).
+message_size(#basic_message{content = Content}) ->
+ #content{payload_fragments_rev = PFR} = Content,
+ iolist_size(PFR);
+message_size({'$prefix_msg', H}) ->
+ get_size_from_header(H);
+message_size({'$empty_msg', H}) ->
+ get_size_from_header(H);
+message_size(B) when is_binary(B) ->
+ byte_size(B);
+message_size(Msg) ->
+ %% probably only hit this for testing so ok to use erts_debug
+ erts_debug:size(Msg).
+get_size_from_header(Size) when is_integer(Size) ->
+ Size;
+get_size_from_header(#{size := B}) ->
+ B.
+all_nodes(#?MODULE{consumers = Cons0,
+ enqueuers = Enqs0,
+ waiting_consumers = WaitingConsumers0}) ->
+ Nodes0 = maps:fold(fun({_, P}, _, Acc) ->
+ Acc#{node(P) => ok}
+ end, #{}, Cons0),
+ Nodes1 = maps:fold(fun(P, _, Acc) ->
+ Acc#{node(P) => ok}
+ end, Nodes0, Enqs0),
+ maps:keys(
+ lists:foldl(fun({{_, P}, _}, Acc) ->
+ Acc#{node(P) => ok}
+ end, Nodes1, WaitingConsumers0)).
+all_pids_for(Node, #?MODULE{consumers = Cons0,
+ enqueuers = Enqs0,
+ waiting_consumers = WaitingConsumers0}) ->
+ Cons = maps:fold(fun({_, P}, _, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, [], Cons0),
+ Enqs = maps:fold(fun(P, _, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, Cons, Enqs0),
+ lists:foldl(fun({{_, P}, _}, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, Acc) -> Acc
+ end, Enqs, WaitingConsumers0).
+suspected_pids_for(Node, #?MODULE{consumers = Cons0,
+ enqueuers = Enqs0,
+ waiting_consumers = WaitingConsumers0}) ->
+ Cons = maps:fold(fun({_, P}, #consumer{status = suspected_down}, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, [], Cons0),
+ Enqs = maps:fold(fun(P, #enqueuer{status = suspected_down}, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, Cons, Enqs0),
+ lists:foldl(fun({{_, P},
+ #consumer{status = suspected_down}}, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, Acc) -> Acc
+ end, Enqs, WaitingConsumers0).
+is_expired(Ts, #?MODULE{cfg = #cfg{expires = Expires},
+ last_active = LastActive,
+ consumers = Consumers})
+ when is_number(LastActive) andalso is_number(Expires) ->
+ %% TODO: should it be active consumers?
+ Active = maps:filter(fun (_, #consumer{status = suspected_down}) ->
+ false;
+ (_, _) ->
+ true
+ end, Consumers),
+ Ts > (LastActive + Expires) andalso maps:size(Active) == 0;
+is_expired(_Ts, _State) ->
+ false.
+get_priority_from_args(#{args := Args}) ->
+ case rabbit_misc:table_lookup(Args, <<"x-priority">>) of
+ {_Key, Value} ->
+ Value;
+ _ -> 0
+ end;
+get_priority_from_args(_) ->
+ 0.