summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_queue_type.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/src/rabbit_queue_type.erl')
-rw-r--r--deps/rabbit/src/rabbit_queue_type.erl581
1 files changed, 581 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl
new file mode 100644
index 0000000000..4e59b6a7c0
--- /dev/null
+++ b/deps/rabbit/src/rabbit_queue_type.erl
@@ -0,0 +1,581 @@
+-module(rabbit_queue_type).
+-include("amqqueue.hrl").
+-include_lib("rabbit_common/include/resource.hrl").
+
+-export([
+ init/0,
+ close/1,
+ discover/1,
+ default/0,
+ is_enabled/1,
+ declare/2,
+ delete/4,
+ is_recoverable/1,
+ recover/2,
+ purge/1,
+ policy_changed/1,
+ stat/1,
+ remove/2,
+ info/2,
+ state_info/1,
+ info_down/2,
+ info_down/3,
+ %% stateful client API
+ new/2,
+ consume/3,
+ cancel/5,
+ handle_down/3,
+ handle_event/3,
+ module/2,
+ deliver/3,
+ settle/5,
+ credit/5,
+ dequeue/5,
+ fold_state/3,
+ is_policy_applicable/2,
+ is_server_named_allowed/1
+ ]).
+
+%% gah what is a good identity of a classic queue including all replicas
+-type queue_name() :: rabbit_types:r(queue).
+-type queue_ref() :: queue_name() | atom().
+-type queue_state() :: term().
+-type msg_tag() :: term().
+
+-define(STATE, ?MODULE).
+
+%% Recoverable slaves shouldn't really be a generic one, but let's keep it here until
+%% mirrored queues are deprecated.
+-define(DOWN_KEYS, [name, durable, auto_delete, arguments, pid, recoverable_slaves, type, state]).
+
+-define(QREF(QueueReference),
+ (is_tuple(QueueReference) andalso element(1, QueueReference) == resource)
+ orelse is_atom(QueueReference)).
+%% anything that the host process needs to do on behalf of the queue type
+%% session, like knowing when to notify on monitor down
+-type action() ::
+ {monitor, Pid :: pid(), queue_ref()} |
+ %% indicate to the queue type module that a message has been delivered
+ %% fully to the queue
+ {settled, Success :: boolean(), [msg_tag()]} |
+ {deliver, rabbit_types:ctag(), boolean(), [rabbit_amqqueue:qmsg()]}.
+
+-type actions() :: [action()].
+
+-type event() ::
+ {down, pid(), Info :: term()} |
+ term().
+
+-record(ctx, {module :: module(),
+ name :: queue_name(),
+ %% "publisher confirm queue accounting"
+ %% queue type implementation should emit a:
+ %% {settle, Success :: boolean(), msg_tag()}
+ %% to either settle or reject the delivery of a
+ %% message to the queue instance
+ %% The queue type module will then emit a {confirm | reject, [msg_tag()}
+ %% action to the channel or channel like process when a msg_tag
+ %% has reached its conclusion
+ state :: queue_state()}).
+
+
+-record(?STATE, {ctxs = #{} :: #{queue_ref() => #ctx{} | queue_ref()},
+ monitor_registry = #{} :: #{pid() => queue_ref()}
+ }).
+
+-opaque state() :: #?STATE{}.
+
+-type consume_spec() :: #{no_ack := boolean(),
+ channel_pid := pid(),
+ limiter_pid => pid(),
+ limiter_active => boolean(),
+ prefetch_count => non_neg_integer(),
+ consumer_tag := rabbit_types:ctag(),
+ exclusive_consume => boolean(),
+ args => rabbit_framing:amqp_table(),
+ ok_msg := term(),
+ acting_user := rabbit_types:username()}.
+
+
+
+% copied from rabbit_amqqueue
+-type absent_reason() :: 'nodedown' | 'crashed' | stopped | timeout.
+
+-type settle_op() :: 'complete' | 'requeue' | 'discard'.
+
+-export_type([state/0,
+ consume_spec/0,
+ action/0,
+ actions/0,
+ settle_op/0]).
+
+%% is the queue type feature enabled
+-callback is_enabled() -> boolean().
+
+-callback declare(amqqueue:amqqueue(), node()) ->
+ {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
+ {'absent', amqqueue:amqqueue(), absent_reason()} |
+ {'protocol_error', Type :: atom(), Reason :: string(), Args :: term()}.
+
+-callback delete(amqqueue:amqqueue(),
+ boolean(),
+ boolean(),
+ rabbit_types:username()) ->
+ rabbit_types:ok(non_neg_integer()) |
+ rabbit_types:error(in_use | not_empty) |
+ {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
+
+-callback recover(rabbit_types:vhost(), [amqqueue:amqqueue()]) ->
+ {Recovered :: [amqqueue:amqqueue()],
+ Failed :: [amqqueue:amqqueue()]}.
+
+%% checks if the queue should be recovered
+-callback is_recoverable(amqqueue:amqqueue()) ->
+ boolean().
+
+-callback purge(amqqueue:amqqueue()) ->
+ {ok, non_neg_integer()} | {error, term()}.
+
+-callback policy_changed(amqqueue:amqqueue()) -> ok.
+
+%% stateful
+%% intitialise and return a queue type specific session context
+-callback init(amqqueue:amqqueue()) -> queue_state().
+
+-callback close(queue_state()) -> ok.
+%% update the queue type state from amqqrecord
+-callback update(amqqueue:amqqueue(), queue_state()) -> queue_state().
+
+-callback consume(amqqueue:amqqueue(),
+ consume_spec(),
+ queue_state()) ->
+ {ok, queue_state(), actions()} | {error, term()} |
+ {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
+
+-callback cancel(amqqueue:amqqueue(),
+ rabbit_types:ctag(),
+ term(),
+ rabbit_types:username(),
+ queue_state()) ->
+ {ok, queue_state()} | {error, term()}.
+
+%% any async events returned from the queue system should be processed through
+%% this
+-callback handle_event(Event :: event(),
+ queue_state()) ->
+ {ok, queue_state(), actions()} | {error, term()} | eol |
+ {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
+
+-callback deliver([{amqqueue:amqqueue(), queue_state()}],
+ Delivery :: term()) ->
+ {[{amqqueue:amqqueue(), queue_state()}], actions()}.
+
+-callback settle(settle_op(), rabbit_types:ctag(), [non_neg_integer()], queue_state()) ->
+ {queue_state(), actions()} |
+ {'protocol_error', Type :: atom(), Reason :: string(), Args :: term()}.
+
+-callback credit(rabbit_types:ctag(),
+ non_neg_integer(), Drain :: boolean(), queue_state()) ->
+ {queue_state(), actions()}.
+
+-callback dequeue(NoAck :: boolean(), LimiterPid :: pid(),
+ rabbit_types:ctag(), queue_state()) ->
+ {ok, Count :: non_neg_integer(), rabbit_amqqueue:qmsg(), queue_state()} |
+ {empty, queue_state()} |
+ {error, term()} |
+ {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
+
+%% return a map of state summary information
+-callback state_info(queue_state()) ->
+ #{atom() := term()}.
+
+%% general queue info
+-callback info(amqqueue:amqqueue(), all_keys | rabbit_types:info_keys()) ->
+ rabbit_types:infos().
+
+-callback stat(amqqueue:amqqueue()) ->
+ {'ok', non_neg_integer(), non_neg_integer()}.
+
+-callback capabilities() ->
+ #{atom() := term()}.
+
+%% TODO: this should be controlled by a registry that is populated on boot
+discover(<<"quorum">>) ->
+ rabbit_quorum_queue;
+discover(<<"classic">>) ->
+ rabbit_classic_queue;
+discover(<<"stream">>) ->
+ rabbit_stream_queue.
+
+default() ->
+ rabbit_classic_queue.
+
+-spec is_enabled(module()) -> boolean().
+is_enabled(Type) ->
+ Type:is_enabled().
+
+-spec declare(amqqueue:amqqueue(), node()) ->
+ {'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
+ {'absent', amqqueue:amqqueue(), absent_reason()} |
+ {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
+declare(Q, Node) ->
+ Mod = amqqueue:get_type(Q),
+ Mod:declare(Q, Node).
+
+-spec delete(amqqueue:amqqueue(), boolean(),
+ boolean(), rabbit_types:username()) ->
+ rabbit_types:ok(non_neg_integer()) |
+ rabbit_types:error(in_use | not_empty) |
+ {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
+delete(Q, IfUnused, IfEmpty, ActingUser) ->
+ Mod = amqqueue:get_type(Q),
+ Mod:delete(Q, IfUnused, IfEmpty, ActingUser).
+
+-spec purge(amqqueue:amqqueue()) ->
+ {'ok', non_neg_integer()} | {error, term()}.
+purge(Q) ->
+ Mod = amqqueue:get_type(Q),
+ Mod:purge(Q).
+
+-spec policy_changed(amqqueue:amqqueue()) -> 'ok'.
+policy_changed(Q) ->
+ Mod = amqqueue:get_type(Q),
+ Mod:policy_changed(Q).
+
+-spec stat(amqqueue:amqqueue()) ->
+ {'ok', non_neg_integer(), non_neg_integer()}.
+stat(Q) ->
+ Mod = amqqueue:get_type(Q),
+ Mod:stat(Q).
+
+-spec remove(queue_ref(), state()) -> state().
+remove(QRef, #?STATE{ctxs = Ctxs0} = State) ->
+ case maps:take(QRef, Ctxs0) of
+ error ->
+ State;
+ {_, Ctxs} ->
+ State#?STATE{ctxs = Ctxs}
+ end.
+
+-spec info(amqqueue:amqqueue(), all_keys | rabbit_types:info_keys()) ->
+ rabbit_types:infos().
+info(Q, Items) when ?amqqueue_state_is(Q, crashed) ->
+ info_down(Q, Items, crashed);
+info(Q, Items) when ?amqqueue_state_is(Q, stopped) ->
+ info_down(Q, Items, stopped);
+info(Q, Items) ->
+ Mod = amqqueue:get_type(Q),
+ Mod:info(Q, Items).
+
+fold_state(Fun, Acc, #?STATE{ctxs = Ctxs}) ->
+ maps:fold(Fun, Acc, Ctxs).
+
+state_info(#ctx{state = S,
+ module = Mod}) ->
+ Mod:state_info(S);
+state_info(_) ->
+ #{}.
+
+down_keys() -> ?DOWN_KEYS.
+
+info_down(Q, DownReason) ->
+ info_down(Q, down_keys(), DownReason).
+
+info_down(Q, all_keys, DownReason) ->
+ info_down(Q, down_keys(), DownReason);
+info_down(Q, Items, DownReason) ->
+ [{Item, i_down(Item, Q, DownReason)} || Item <- Items].
+
+i_down(name, Q, _) -> amqqueue:get_name(Q);
+i_down(durable, Q, _) -> amqqueue:is_durable(Q);
+i_down(auto_delete, Q, _) -> amqqueue:is_auto_delete(Q);
+i_down(arguments, Q, _) -> amqqueue:get_arguments(Q);
+i_down(pid, Q, _) -> amqqueue:get_pid(Q);
+i_down(recoverable_slaves, Q, _) -> amqqueue:get_recoverable_slaves(Q);
+i_down(type, Q, _) -> amqqueue:get_type(Q);
+i_down(state, _Q, DownReason) -> DownReason;
+i_down(_K, _Q, _DownReason) -> ''.
+
+is_policy_applicable(Q, Policy) ->
+ Mod = amqqueue:get_type(Q),
+ Capabilities = Mod:capabilities(),
+ Applicable = maps:get(policies, Capabilities, []),
+ lists:all(fun({P, _}) ->
+ lists:member(P, Applicable)
+ end, Policy).
+
+is_server_named_allowed(Type) ->
+ Capabilities = Type:capabilities(),
+ maps:get(server_named, Capabilities, false).
+
+-spec init() -> state().
+init() ->
+ #?STATE{}.
+
+-spec close(state()) -> ok.
+close(#?STATE{ctxs = Contexts}) ->
+ _ = maps:map(
+ fun (_, #ctx{module = Mod,
+ state = S}) ->
+ ok = Mod:close(S)
+ end, Contexts),
+ ok.
+
+-spec new(amqqueue:amqqueue(), state()) -> state().
+new(Q, State) when ?is_amqqueue(Q) ->
+ Ctx = get_ctx(Q, State),
+ set_ctx(Q, Ctx, State).
+
+-spec consume(amqqueue:amqqueue(), consume_spec(), state()) ->
+ {ok, state(), actions()} | {error, term()}.
+consume(Q, Spec, State) ->
+ #ctx{state = CtxState0} = Ctx = get_ctx(Q, State),
+ Mod = amqqueue:get_type(Q),
+ case Mod:consume(Q, Spec, CtxState0) of
+ {ok, CtxState, Actions} ->
+ return_ok(set_ctx(Q, Ctx#ctx{state = CtxState}, State), Actions);
+ Err ->
+ Err
+ end.
+
+%% TODO switch to cancel spec api
+-spec cancel(amqqueue:amqqueue(),
+ rabbit_types:ctag(),
+ term(),
+ rabbit_types:username(),
+ state()) ->
+ {ok, state()} | {error, term()}.
+cancel(Q, Tag, OkMsg, ActiveUser, Ctxs) ->
+ #ctx{state = State0} = Ctx = get_ctx(Q, Ctxs),
+ Mod = amqqueue:get_type(Q),
+ case Mod:cancel(Q, Tag, OkMsg, ActiveUser, State0) of
+ {ok, State} ->
+ {ok, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)};
+ Err ->
+ Err
+ end.
+
+-spec is_recoverable(amqqueue:amqqueue()) ->
+ boolean().
+is_recoverable(Q) ->
+ Mod = amqqueue:get_type(Q),
+ Mod:is_recoverable(Q).
+
+-spec recover(rabbit_types:vhost(), [amqqueue:amqqueue()]) ->
+ {Recovered :: [amqqueue:amqqueue()],
+ Failed :: [amqqueue:amqqueue()]}.
+recover(VHost, Qs) ->
+ ByType = lists:foldl(
+ fun (Q, Acc) ->
+ T = amqqueue:get_type(Q),
+ maps:update_with(T, fun (X) ->
+ [Q | X]
+ end, Acc)
+ %% TODO resolve all registered queue types from registry
+ end, #{rabbit_classic_queue => [],
+ rabbit_quorum_queue => [],
+ rabbit_stream_queue => []}, Qs),
+ maps:fold(fun (Mod, Queues, {R0, F0}) ->
+ {R, F} = Mod:recover(VHost, Queues),
+ {R0 ++ R, F0 ++ F}
+ end, {[], []}, ByType).
+
+-spec handle_down(pid(), term(), state()) ->
+ {ok, state(), actions()} | {eol, queue_ref()} | {error, term()}.
+handle_down(Pid, Info, #?STATE{monitor_registry = Reg0} = State0) ->
+ %% lookup queue ref in monitor registry
+ case maps:take(Pid, Reg0) of
+ {QRef, Reg} ->
+ case handle_event(QRef, {down, Pid, Info}, State0) of
+ {ok, State, Actions} ->
+ {ok, State#?STATE{monitor_registry = Reg}, Actions};
+ eol ->
+ {eol, QRef};
+ Err ->
+ Err
+ end;
+ error ->
+ {ok, State0, []}
+ end.
+
+%% messages sent from queues
+-spec handle_event(queue_ref(), term(), state()) ->
+ {ok, state(), actions()} | eol | {error, term()} |
+ {protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
+handle_event(QRef, Evt, Ctxs) ->
+ %% events can arrive after a queue state has been cleared up
+ %% so need to be defensive here
+ case get_ctx(QRef, Ctxs, undefined) of
+ #ctx{module = Mod,
+ state = State0} = Ctx ->
+ case Mod:handle_event(Evt, State0) of
+ {ok, State, Actions} ->
+ return_ok(set_ctx(QRef, Ctx#ctx{state = State}, Ctxs), Actions);
+ Err ->
+ Err
+ end;
+ undefined ->
+ {ok, Ctxs, []}
+ end.
+
+-spec module(queue_ref(), state()) ->
+ {ok, module()} | {error, not_found}.
+module(QRef, Ctxs) ->
+ %% events can arrive after a queue state has been cleared up
+ %% so need to be defensive here
+ case get_ctx(QRef, Ctxs, undefined) of
+ #ctx{module = Mod} ->
+ {ok, Mod};
+ undefined ->
+ {error, not_found}
+ end.
+
+-spec deliver([amqqueue:amqqueue()], Delivery :: term(),
+ stateless | state()) ->
+ {ok, state(), actions()}.
+deliver(Qs, Delivery, stateless) ->
+ _ = lists:map(fun(Q) ->
+ Mod = amqqueue:get_type(Q),
+ _ = Mod:deliver([{Q, stateless}], Delivery)
+ end, Qs),
+ {ok, stateless, []};
+deliver(Qs, Delivery, #?STATE{} = State0) ->
+ %% sort by queue type - then dispatch each group
+ ByType = lists:foldl(
+ fun (Q, Acc) ->
+ T = amqqueue:get_type(Q),
+ Ctx = get_ctx(Q, State0),
+ maps:update_with(
+ T, fun (A) ->
+ [{Q, Ctx#ctx.state} | A]
+ end, [{Q, Ctx#ctx.state}], Acc)
+ end, #{}, Qs),
+ %%% dispatch each group to queue type interface?
+ {Xs, Actions} = maps:fold(fun(Mod, QSs, {X0, A0}) ->
+ {X, A} = Mod:deliver(QSs, Delivery),
+ {X0 ++ X, A0 ++ A}
+ end, {[], []}, ByType),
+ State = lists:foldl(
+ fun({Q, S}, Acc) ->
+ Ctx = get_ctx(Q, Acc),
+ set_ctx(qref(Q), Ctx#ctx{state = S}, Acc)
+ end, State0, Xs),
+ return_ok(State, Actions).
+
+
+-spec settle(queue_ref(), settle_op(), rabbit_types:ctag(),
+ [non_neg_integer()], state()) ->
+ {ok, state(), actions()} |
+ {'protocol_error', Type :: atom(), Reason :: string(), Args :: term()}.
+settle(QRef, Op, CTag, MsgIds, Ctxs)
+ when ?QREF(QRef) ->
+ case get_ctx(QRef, Ctxs, undefined) of
+ undefined ->
+ %% if we receive a settlement and there is no queue state it means
+ %% the queue was deleted with active consumers
+ {ok, Ctxs, []};
+ #ctx{state = State0,
+ module = Mod} = Ctx ->
+ case Mod:settle(Op, CTag, MsgIds, State0) of
+ {State, Actions} ->
+ {ok, set_ctx(QRef, Ctx#ctx{state = State}, Ctxs), Actions};
+ Err ->
+ Err
+ end
+ end.
+
+-spec credit(amqqueue:amqqueue() | queue_ref(),
+ rabbit_types:ctag(), non_neg_integer(),
+ boolean(), state()) -> {ok, state(), actions()}.
+credit(Q, CTag, Credit, Drain, Ctxs) ->
+ #ctx{state = State0,
+ module = Mod} = Ctx = get_ctx(Q, Ctxs),
+ {State, Actions} = Mod:credit(CTag, Credit, Drain, State0),
+ {ok, set_ctx(Q, Ctx#ctx{state = State}, Ctxs), Actions}.
+
+-spec dequeue(amqqueue:amqqueue(), boolean(),
+ pid(), rabbit_types:ctag(), state()) ->
+ {ok, non_neg_integer(), term(), state()} |
+ {empty, state()}.
+dequeue(Q, NoAck, LimiterPid, CTag, Ctxs) ->
+ #ctx{state = State0} = Ctx = get_ctx(Q, Ctxs),
+ Mod = amqqueue:get_type(Q),
+ case Mod:dequeue(NoAck, LimiterPid, CTag, State0) of
+ {ok, Num, Msg, State} ->
+ {ok, Num, Msg, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)};
+ {empty, State} ->
+ {empty, set_ctx(Q, Ctx#ctx{state = State}, Ctxs)};
+ {error, _} = Err ->
+ Err;
+ {protocol_error, _, _, _} = Err ->
+ Err
+ end.
+
+get_ctx(Q, #?STATE{ctxs = Contexts}) when ?is_amqqueue(Q) ->
+ Ref = qref(Q),
+ case Contexts of
+ #{Ref := #ctx{module = Mod,
+ state = State} = Ctx} ->
+ Ctx#ctx{state = Mod:update(Q, State)};
+ _ ->
+ %% not found - initialize
+ Mod = amqqueue:get_type(Q),
+ Name = amqqueue:get_name(Q),
+ #ctx{module = Mod,
+ name = Name,
+ state = Mod:init(Q)}
+ end;
+get_ctx(QRef, Contexts) when ?QREF(QRef) ->
+ case get_ctx(QRef, Contexts, undefined) of
+ undefined ->
+ exit({queue_context_not_found, QRef});
+ Ctx ->
+ Ctx
+ end.
+
+get_ctx(QRef, #?STATE{ctxs = Contexts}, Default) ->
+ Ref = qref(QRef),
+ %% if we use a QRef it should always be initialised
+ case maps:get(Ref, Contexts, undefined) of
+ #ctx{} = Ctx ->
+ Ctx;
+ undefined ->
+ Default
+ end.
+
+set_ctx(Q, Ctx, #?STATE{ctxs = Contexts} = State)
+ when ?is_amqqueue(Q) ->
+ Ref = qref(Q),
+ State#?STATE{ctxs = maps:put(Ref, Ctx, Contexts)};
+set_ctx(QRef, Ctx, #?STATE{ctxs = Contexts} = State) ->
+ Ref = qref(QRef),
+ State#?STATE{ctxs = maps:put(Ref, Ctx, Contexts)}.
+
+qref(#resource{kind = queue} = QName) ->
+ QName;
+qref(Q) when ?is_amqqueue(Q) ->
+ amqqueue:get_name(Q).
+
+return_ok(State0, []) ->
+ {ok, State0, []};
+return_ok(State0, Actions0) ->
+ {State, Actions} =
+ lists:foldl(
+ fun({monitor, Pid, QRef},
+ {#?STATE{monitor_registry = M0} = S0, A0}) ->
+ case M0 of
+ #{Pid := QRef} ->
+ %% already monitored by the qref
+ {S0, A0};
+ #{Pid := _} ->
+ %% TODO: allow multiple Qrefs to monitor the same pid
+ exit(return_ok_duplicate_monitored_pid);
+ _ ->
+ _ = erlang:monitor(process, Pid),
+ M = M0#{Pid => QRef},
+ {S0#?STATE{monitor_registry = M}, A0}
+ end;
+ (Act, {S, A0}) ->
+ {S, [Act | A0]}
+ end, {State0, []}, Actions0),
+ {ok, State, lists:reverse(Actions)}.