summaryrefslogtreecommitdiff
path: root/deps/rabbit/src/rabbit_classic_queue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbit/src/rabbit_classic_queue.erl')
-rw-r--r--deps/rabbit/src/rabbit_classic_queue.erl527
1 files changed, 527 insertions, 0 deletions
diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl
new file mode 100644
index 0000000000..e53c0aecc2
--- /dev/null
+++ b/deps/rabbit/src/rabbit_classic_queue.erl
@@ -0,0 +1,527 @@
+-module(rabbit_classic_queue).
+-behaviour(rabbit_queue_type).
+
+-include("amqqueue.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+-record(msg_status, {pending :: [pid()],
+ confirmed = [] :: [pid()]}).
+
+-record(?MODULE, {pid :: undefined | pid(), %% the current master pid
+ qref :: term(), %% TODO
+ unconfirmed = #{} ::
+ #{non_neg_integer() => #msg_status{}}}).
+-define(STATE, ?MODULE).
+
+-opaque state() :: #?STATE{}.
+
+-export_type([state/0]).
+
+-export([
+ is_enabled/0,
+ declare/2,
+ delete/4,
+ is_recoverable/1,
+ recover/2,
+ purge/1,
+ policy_changed/1,
+ stat/1,
+ init/1,
+ close/1,
+ update/2,
+ consume/3,
+ cancel/5,
+ handle_event/2,
+ deliver/2,
+ settle/4,
+ credit/4,
+ dequeue/4,
+ info/2,
+ state_info/1,
+ capabilities/0
+ ]).
+
+-export([delete_crashed/1,
+ delete_crashed/2,
+ delete_crashed_internal/2]).
+
+-export([confirm_to_sender/3,
+ send_rejection/3,
+ send_queue_event/3]).
+
+is_enabled() -> true.
+
+declare(Q, Node) when ?amqqueue_is_classic(Q) ->
+ QName = amqqueue:get_name(Q),
+ VHost = amqqueue:get_vhost(Q),
+ Node1 = case Node of
+ {ignore_location, Node0} ->
+ Node0;
+ _ ->
+ case rabbit_queue_master_location_misc:get_location(Q) of
+ {ok, Node0} -> Node0;
+ _ -> Node
+ end
+ end,
+ Node1 = rabbit_mirror_queue_misc:initial_queue_node(Q, Node1),
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost, Node1) of
+ {ok, _} ->
+ gen_server2:call(
+ rabbit_amqqueue_sup_sup:start_queue_process(Node1, Q, declare),
+ {init, new}, infinity);
+ {error, Error} ->
+ {protocol_error, internal_error, "Cannot declare a queue '~s' on node '~s': ~255p",
+ [rabbit_misc:rs(QName), Node1, Error]}
+ end.
+
+delete(Q, IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_classic(Q) ->
+ case wait_for_promoted_or_stopped(Q) of
+ {promoted, Q1} ->
+ QPid = amqqueue:get_pid(Q1),
+ delegate:invoke(QPid, {gen_server2, call,
+ [{delete, IfUnused, IfEmpty, ActingUser},
+ infinity]});
+ {stopped, Q1} ->
+ #resource{name = Name, virtual_host = Vhost} = amqqueue:get_name(Q1),
+ case IfEmpty of
+ true ->
+ rabbit_log:error("Queue ~s in vhost ~s has its master node down and "
+ "no mirrors available or eligible for promotion. "
+ "The queue may be non-empty. "
+ "Refusing to force-delete.",
+ [Name, Vhost]),
+ {error, not_empty};
+ false ->
+ rabbit_log:warning("Queue ~s in vhost ~s has its master node is down and "
+ "no mirrors available or eligible for promotion. "
+ "Forcing queue deletion.",
+ [Name, Vhost]),
+ delete_crashed_internal(Q1, ActingUser),
+ {ok, 0}
+ end;
+ {error, not_found} ->
+ %% Assume the queue was deleted
+ {ok, 0}
+ end.
+
+is_recoverable(Q) when ?is_amqqueue(Q) ->
+ Node = node(),
+ Node =:= node(amqqueue:get_pid(Q)) andalso
+ %% Terminations on node down will not remove the rabbit_queue
+ %% record if it is a mirrored queue (such info is now obtained from
+ %% the policy). Thus, we must check if the local pid is alive
+ %% - if the record is present - in order to restart.
+ (mnesia:read(rabbit_queue, amqqueue:get_name(Q), read) =:= []
+ orelse not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q))).
+
+recover(VHost, Queues) ->
+ {ok, BQ} = application:get_env(rabbit, backing_queue_module),
+ %% We rely on BQ:start/1 returning the recovery terms in the same
+ %% order as the supplied queue names, so that we can zip them together
+ %% for further processing in recover_durable_queues.
+ {ok, OrderedRecoveryTerms} =
+ BQ:start(VHost, [amqqueue:get_name(Q) || Q <- Queues]),
+ case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of
+ {ok, _} ->
+ RecoveredQs = recover_durable_queues(lists:zip(Queues,
+ OrderedRecoveryTerms)),
+ RecoveredNames = [amqqueue:get_name(Q) || Q <- RecoveredQs],
+ FailedQueues = [Q || Q <- Queues,
+ not lists:member(amqqueue:get_name(Q), RecoveredNames)],
+ {RecoveredQs, FailedQueues};
+ {error, Reason} ->
+ rabbit_log:error("Failed to start queue supervisor for vhost '~s': ~s", [VHost, Reason]),
+ throw({error, Reason})
+ end.
+
+-spec policy_changed(amqqueue:amqqueue()) -> ok.
+policy_changed(Q) ->
+ QPid = amqqueue:get_pid(Q),
+ gen_server2:cast(QPid, policy_changed).
+
+stat(Q) ->
+ delegate:invoke(amqqueue:get_pid(Q),
+ {gen_server2, call, [stat, infinity]}).
+
+-spec init(amqqueue:amqqueue()) -> state().
+init(Q) when ?amqqueue_is_classic(Q) ->
+ QName = amqqueue:get_name(Q),
+ #?STATE{pid = amqqueue:get_pid(Q),
+ qref = QName}.
+
+-spec close(state()) -> ok.
+close(_State) ->
+ ok.
+
+-spec update(amqqueue:amqqueue(), state()) -> state().
+update(Q, #?STATE{pid = Pid} = State) when ?amqqueue_is_classic(Q) ->
+ case amqqueue:get_pid(Q) of
+ Pid ->
+ State;
+ NewPid ->
+ %% master pid is different, update
+ State#?STATE{pid = NewPid}
+ end.
+
+consume(Q, Spec, State) when ?amqqueue_is_classic(Q) ->
+ QPid = amqqueue:get_pid(Q),
+ QRef = amqqueue:get_name(Q),
+ #{no_ack := NoAck,
+ channel_pid := ChPid,
+ limiter_pid := LimiterPid,
+ limiter_active := LimiterActive,
+ prefetch_count := ConsumerPrefetchCount,
+ consumer_tag := ConsumerTag,
+ exclusive_consume := ExclusiveConsume,
+ args := Args,
+ ok_msg := OkMsg,
+ acting_user := ActingUser} = Spec,
+ case delegate:invoke(QPid,
+ {gen_server2, call,
+ [{basic_consume, NoAck, ChPid, LimiterPid,
+ LimiterActive, ConsumerPrefetchCount, ConsumerTag,
+ ExclusiveConsume, Args, OkMsg, ActingUser},
+ infinity]}) of
+ ok ->
+ %% ask the host process to monitor this pid
+ %% TODO: track pids as they change
+ {ok, State#?STATE{pid = QPid}, [{monitor, QPid, QRef}]};
+ Err ->
+ Err
+ end.
+
+cancel(Q, ConsumerTag, OkMsg, ActingUser, State) ->
+ QPid = amqqueue:get_pid(Q),
+ case delegate:invoke(QPid, {gen_server2, call,
+ [{basic_cancel, self(), ConsumerTag,
+ OkMsg, ActingUser}, infinity]}) of
+ ok ->
+ {ok, State};
+ Err -> Err
+ end.
+
+-spec settle(rabbit_queue_type:settle_op(), rabbit_types:ctag(),
+ [non_neg_integer()], state()) ->
+ {state(), rabbit_queue_type:actions()}.
+settle(complete, _CTag, MsgIds, State) ->
+ Pid = State#?STATE.pid,
+ delegate:invoke_no_result(Pid,
+ {gen_server2, cast, [{ack, MsgIds, self()}]}),
+ {State, []};
+settle(Op, _CTag, MsgIds, State) ->
+ ChPid = self(),
+ ok = delegate:invoke_no_result(State#?STATE.pid,
+ {gen_server2, cast,
+ [{reject, Op == requeue, MsgIds, ChPid}]}),
+ {State, []}.
+
+credit(CTag, Credit, Drain, State) ->
+ ChPid = self(),
+ delegate:invoke_no_result(State#?STATE.pid,
+ {gen_server2, cast,
+ [{credit, ChPid, CTag, Credit, Drain}]}),
+ {State, []}.
+
+handle_event({confirm, MsgSeqNos, Pid}, #?STATE{qref = QRef,
+ unconfirmed = U0} = State) ->
+ %% confirms should never result in rejections
+ {Unconfirmed, ConfirmedSeqNos, []} =
+ settle_seq_nos(MsgSeqNos, Pid, U0, confirm),
+ Actions = [{settled, QRef, ConfirmedSeqNos}],
+ %% handle confirm event from queues
+ %% in this case the classic queue should track each individual publish and
+ %% the processes involved and only emit a settle action once they have all
+ %% been received (or DOWN has been received).
+ %% Hence this part of the confirm logic is queue specific.
+ {ok, State#?STATE{unconfirmed = Unconfirmed}, Actions};
+handle_event({reject_publish, SeqNo, _QPid},
+ #?STATE{qref = QRef,
+ unconfirmed = U0} = State) ->
+ %% It does not matter which queue rejected the message,
+ %% if any queue did, it should not be confirmed.
+ {U, Rejected} = reject_seq_no(SeqNo, U0),
+ Actions = [{rejected, QRef, Rejected}],
+ {ok, State#?STATE{unconfirmed = U}, Actions};
+handle_event({down, Pid, Info}, #?STATE{qref = QRef,
+ pid = MasterPid,
+ unconfirmed = U0} = State0) ->
+ Actions0 = case Pid =:= MasterPid of
+ true ->
+ [{queue_down, QRef}];
+ false ->
+ []
+ end,
+ case rabbit_misc:is_abnormal_exit(Info) of
+ false when Info =:= normal andalso Pid == MasterPid ->
+ %% queue was deleted and masterpid is down
+ eol;
+ false ->
+ %% this assumes the mirror isn't part of the active set
+ MsgSeqNos = maps:keys(
+ maps:filter(fun (_, #msg_status{pending = Pids}) ->
+ lists:member(Pid, Pids)
+ end, U0)),
+ {Unconfirmed, Settled, Rejected} =
+ settle_seq_nos(MsgSeqNos, Pid, U0, down),
+ Actions = settlement_action(
+ settled, QRef, Settled,
+ settlement_action(rejected, QRef, Rejected, Actions0)),
+ {ok, State0#?STATE{unconfirmed = Unconfirmed}, Actions};
+ true ->
+ %% any abnormal exit should be considered a full reject of the
+ %% oustanding message ids - If the message didn't get to all
+ %% mirrors we have to assume it will never get there
+ MsgIds = maps:fold(
+ fun (SeqNo, Status, Acc) ->
+ case lists:member(Pid, Status#msg_status.pending) of
+ true ->
+ [SeqNo | Acc];
+ false ->
+ Acc
+ end
+ end, [], U0),
+ U = maps:without(MsgIds, U0),
+ {ok, State0#?STATE{unconfirmed = U},
+ [{rejected, QRef, MsgIds} | Actions0]}
+ end;
+handle_event({send_credit_reply, _} = Action, State) ->
+ {ok, State, [Action]}.
+
+settlement_action(_Type, _QRef, [], Acc) ->
+ Acc;
+settlement_action(Type, QRef, MsgSeqs, Acc) ->
+ [{Type, QRef, MsgSeqs} | Acc].
+
+-spec deliver([{amqqueue:amqqueue(), state()}],
+ Delivery :: term()) ->
+ {[{amqqueue:amqqueue(), state()}], rabbit_queue_type:actions()}.
+deliver(Qs0, #delivery{flow = Flow,
+ msg_seq_no = MsgNo,
+ message = #basic_message{exchange_name = _Ex},
+ confirm = _Confirm} = Delivery) ->
+ %% TODO: record master and slaves for confirm processing
+ {MPids, SPids, Qs, Actions} = qpids(Qs0, MsgNo),
+ QPids = MPids ++ SPids,
+ case Flow of
+ %% Here we are tracking messages sent by the rabbit_channel
+ %% process. We are accessing the rabbit_channel process
+ %% dictionary.
+ flow -> [credit_flow:send(QPid) || QPid <- QPids],
+ [credit_flow:send(QPid) || QPid <- SPids];
+ noflow -> ok
+ end,
+ MMsg = {deliver, Delivery, false},
+ SMsg = {deliver, Delivery, true},
+ delegate:invoke_no_result(MPids, {gen_server2, cast, [MMsg]}),
+ delegate:invoke_no_result(SPids, {gen_server2, cast, [SMsg]}),
+ {Qs, Actions}.
+
+
+-spec dequeue(NoAck :: boolean(), LimiterPid :: pid(),
+ rabbit_types:ctag(), state()) ->
+ {ok, Count :: non_neg_integer(), rabbit_amqqueue:qmsg(), state()} |
+ {empty, state()}.
+dequeue(NoAck, LimiterPid, _CTag, State) ->
+ QPid = State#?STATE.pid,
+ case delegate:invoke(QPid, {gen_server2, call,
+ [{basic_get, self(), NoAck, LimiterPid}, infinity]}) of
+ empty ->
+ {empty, State};
+ {ok, Count, Msg} ->
+ {ok, Count, Msg, State}
+ end.
+
+-spec state_info(state()) -> #{atom() := term()}.
+state_info(_State) ->
+ #{}.
+
+%% general queue info
+-spec info(amqqueue:amqqueue(), all_keys | rabbit_types:info_keys()) ->
+ rabbit_types:infos().
+info(Q, Items) ->
+ QPid = amqqueue:get_pid(Q),
+ Req = case Items of
+ all_keys -> info;
+ _ -> {info, Items}
+ end,
+ case delegate:invoke(QPid, {gen_server2, call, [Req, infinity]}) of
+ {ok, Result} ->
+ Result;
+ {error, _Err} ->
+ [];
+ Result when is_list(Result) ->
+ %% this is a backwards compatibility clause
+ Result
+ end.
+
+-spec purge(amqqueue:amqqueue()) ->
+ {ok, non_neg_integer()}.
+purge(Q) when ?is_amqqueue(Q) ->
+ QPid = amqqueue:get_pid(Q),
+ delegate:invoke(QPid, {gen_server2, call, [purge, infinity]}).
+
+qpids(Qs, MsgNo) ->
+ lists:foldl(
+ fun ({Q, S0}, {MPidAcc, SPidAcc, Qs0, Actions0}) ->
+ QPid = amqqueue:get_pid(Q),
+ SPids = amqqueue:get_slave_pids(Q),
+ QRef = amqqueue:get_name(Q),
+ Actions = [{monitor, QPid, QRef}
+ | [{monitor, P, QRef} || P <- SPids]] ++ Actions0,
+ %% confirm record only if MsgNo isn't undefined
+ S = case S0 of
+ #?STATE{unconfirmed = U0} ->
+ Rec = [QPid | SPids],
+ U = case MsgNo of
+ undefined ->
+ U0;
+ _ ->
+ U0#{MsgNo => #msg_status{pending = Rec}}
+ end,
+ S0#?STATE{pid = QPid,
+ unconfirmed = U};
+ stateless ->
+ S0
+ end,
+ {[QPid | MPidAcc], SPidAcc ++ SPids,
+ [{Q, S} | Qs0], Actions}
+ end, {[], [], [], []}, Qs).
+
+%% internal-ish
+-spec wait_for_promoted_or_stopped(amqqueue:amqqueue()) ->
+ {promoted, amqqueue:amqqueue()} |
+ {stopped, amqqueue:amqqueue()} |
+ {error, not_found}.
+wait_for_promoted_or_stopped(Q0) ->
+ QName = amqqueue:get_name(Q0),
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, Q} ->
+ QPid = amqqueue:get_pid(Q),
+ SPids = amqqueue:get_slave_pids(Q),
+ case rabbit_mnesia:is_process_alive(QPid) of
+ true -> {promoted, Q};
+ false ->
+ case lists:any(fun(Pid) ->
+ rabbit_mnesia:is_process_alive(Pid)
+ end, SPids) of
+ %% There is a live slave. May be promoted
+ true ->
+ timer:sleep(100),
+ wait_for_promoted_or_stopped(Q);
+ %% All slave pids are stopped.
+ %% No process left for the queue
+ false -> {stopped, Q}
+ end
+ end;
+ {error, not_found} ->
+ {error, not_found}
+ end.
+
+-spec delete_crashed(amqqueue:amqqueue()) -> ok.
+delete_crashed(Q) ->
+ delete_crashed(Q, ?INTERNAL_USER).
+
+delete_crashed(Q, ActingUser) ->
+ ok = rpc:call(amqqueue:qnode(Q), ?MODULE, delete_crashed_internal,
+ [Q, ActingUser]).
+
+delete_crashed_internal(Q, ActingUser) ->
+ QName = amqqueue:get_name(Q),
+ {ok, BQ} = application:get_env(rabbit, backing_queue_module),
+ BQ:delete_crashed(Q),
+ ok = rabbit_amqqueue:internal_delete(QName, ActingUser).
+
+recover_durable_queues(QueuesAndRecoveryTerms) ->
+ {Results, Failures} =
+ gen_server2:mcall(
+ [{rabbit_amqqueue_sup_sup:start_queue_process(node(), Q, recovery),
+ {init, {self(), Terms}}} || {Q, Terms} <- QueuesAndRecoveryTerms]),
+ [rabbit_log:error("Queue ~p failed to initialise: ~p~n",
+ [Pid, Error]) || {Pid, Error} <- Failures],
+ [Q || {_, {new, Q}} <- Results].
+
+capabilities() ->
+ #{policies => [<<"expires">>, <<"message-ttl">>, <<"dead-letter-exchange">>,
+ <<"dead-letter-routing-key">>, <<"max-length">>,
+ <<"max-length-bytes">>, <<"max-in-memory-length">>, <<"max-in-memory-bytes">>,
+ <<"max-priority">>, <<"overflow">>, <<"queue-mode">>,
+ <<"single-active-consumer">>, <<"delivery-limit">>,
+ <<"ha-mode">>, <<"ha-params">>, <<"ha-sync-mode">>,
+ <<"ha-promote-on-shutdown">>, <<"ha-promote-on-failure">>,
+ <<"queue-master-locator">>],
+ queue_arguments => [<<"x-expires">>, <<"x-message-ttl">>, <<"x-dead-letter-exchange">>,
+ <<"x-dead-letter-routing-key">>, <<"x-max-length">>,
+ <<"x-max-length-bytes">>, <<"x-max-in-memory-length">>,
+ <<"x-max-in-memory-bytes">>, <<"x-max-priority">>,
+ <<"x-overflow">>, <<"x-queue-mode">>, <<"x-single-active-consumer">>,
+ <<"x-queue-type">>, <<"x-queue-master-locator">>],
+ consumer_arguments => [<<"x-cancel-on-ha-failover">>,
+ <<"x-priority">>, <<"x-credit">>
+ ],
+ server_named => true}.
+
+reject_seq_no(SeqNo, U0) ->
+ reject_seq_no(SeqNo, U0, []).
+
+reject_seq_no(SeqNo, U0, Acc) ->
+ case maps:take(SeqNo, U0) of
+ {_, U} ->
+ {U, [SeqNo | Acc]};
+ error ->
+ {U0, Acc}
+ end.
+
+settle_seq_nos(MsgSeqNos, Pid, U0, Reason) ->
+ lists:foldl(
+ fun (SeqNo, {U, C0, R0}) ->
+ case U of
+ #{SeqNo := Status0} ->
+ case update_msg_status(Reason, Pid, Status0) of
+ #msg_status{pending = [],
+ confirmed = []} ->
+ %% no pending left and nothing confirmed
+ %% then we reject it
+ {maps:remove(SeqNo, U), C0, [SeqNo | R0]};
+ #msg_status{pending = [],
+ confirmed = _} ->
+ %% this can be confirmed as there are no pending
+ %% and confirmed isn't empty
+ {maps:remove(SeqNo, U), [SeqNo | C0], R0};
+ MsgStatus ->
+ {U#{SeqNo => MsgStatus}, C0, R0}
+ end;
+ _ ->
+ {U, C0, R0}
+ end
+ end, {U0, [], []}, MsgSeqNos).
+
+update_msg_status(confirm, Pid, #msg_status{pending = P,
+ confirmed = C} = S) ->
+ Rem = lists:delete(Pid, P),
+ S#msg_status{pending = Rem, confirmed = [Pid | C]};
+update_msg_status(down, Pid, #msg_status{pending = P} = S) ->
+ S#msg_status{pending = lists:delete(Pid, P)}.
+
+%% part of channel <-> queue api
+confirm_to_sender(Pid, QName, MsgSeqNos) ->
+ %% the stream queue included the queue type refactoring and thus requires
+ %% a different message format
+ Evt = case rabbit_ff_registry:is_enabled(stream_queue) of
+ true ->
+ {queue_event, QName, {confirm, MsgSeqNos, self()}};
+ false ->
+ {confirm, MsgSeqNos, self()}
+ end,
+ gen_server2:cast(Pid, Evt).
+
+send_rejection(Pid, QName, MsgSeqNo) ->
+ case rabbit_ff_registry:is_enabled(stream_queue) of
+ true ->
+ gen_server2:cast(Pid, {queue_event, QName,
+ {reject_publish, MsgSeqNo, self()}});
+ false ->
+ gen_server2:cast(Pid, {reject_publish, MsgSeqNo, self()})
+ end.
+
+send_queue_event(Pid, QName, Evt) ->
+ gen_server2:cast(Pid, {queue_event, QName, Evt}).