diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-11-03 16:54:28 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-11-03 16:54:28 +0100 |
commit | 5a15c6c520b035ad2fada10d5af147c31e0102da (patch) | |
tree | c9a63381b27a21186a1f3919b0c380c3cfc052f4 | |
parent | 3d2487323d048397a9e89af9acbdf8ff7489af5c (diff) | |
parent | edd4d75a9b832f6148f1f3fe1a239afe59450a18 (diff) | |
download | rabbitmq-server-git-5a15c6c520b035ad2fada10d5af147c31e0102da.tar.gz |
Merge pull request #2492 from rabbitmq/ack-after-queue-delete
Avoid crash when acking deleted queue
-rw-r--r-- | src/rabbit_channel.erl | 1 | ||||
-rw-r--r-- | src/rabbit_queue_type.erl | 39 | ||||
-rw-r--r-- | test/queue_type_SUITE.erl | 43 |
3 files changed, 58 insertions, 25 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index fb9a428a0e..119c6dd8e9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1472,7 +1472,6 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, QName = amqqueue:get_name(Q), ConsumerMapping1 = maps:remove(ConsumerTag, ConsumerMapping), - % QRef = qpid_to_ref(QPid), QCons1 = case maps:find(QName, QCons) of error -> QCons; diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl index 9e3a521206..569daa7bb2 100644 --- a/src/rabbit_queue_type.erl +++ b/src/rabbit_queue_type.erl @@ -36,9 +36,6 @@ is_server_named_allowed/1 ]). -%% temporary --export([with/3]). - %% gah what is a good identity of a classic queue including all replicas -type queue_name() :: rabbit_types:r(queue). -type queue_ref() :: queue_name() | atom(). @@ -249,12 +246,9 @@ stat(Q) -> remove(QRef, #?STATE{ctxs = Ctxs0} = State) -> case maps:take(QRef, Ctxs0) of error -> - State#?STATE{ctxs = Ctxs0}; + State; {_, Ctxs} -> - %% remove all linked queue refs - State#?STATE{ctxs = maps:filter(fun (_, V) -> - V == QRef - end, Ctxs)} + State#?STATE{ctxs = Ctxs} end. -spec info(amqqueue:amqqueue(), all_keys | rabbit_types:info_keys()) -> @@ -307,7 +301,7 @@ is_policy_applicable(Q, Policy) -> is_server_named_allowed(Type) -> Capabilities = Type:capabilities(), maps:get(server_named, Capabilities, false). - + -spec init() -> state(). init() -> #?STATE{}. @@ -329,9 +323,9 @@ new(Q, State) when ?is_amqqueue(Q) -> -spec consume(amqqueue:amqqueue(), consume_spec(), state()) -> {ok, state(), actions()} | {error, term()}. consume(Q, Spec, State) -> - #ctx{state = State0} = Ctx = get_ctx(Q, State), + #ctx{state = CtxState0} = Ctx = get_ctx(Q, State), Mod = amqqueue:get_type(Q), - case Mod:consume(Q, Spec, State0) of + case Mod:consume(Q, Spec, CtxState0) of {ok, CtxState, Actions} -> return_ok(set_ctx(Q, Ctx#ctx{state = CtxState}, State), Actions); Err -> @@ -466,10 +460,16 @@ deliver(Qs, Delivery, #?STATE{} = State0) -> [non_neg_integer()], state()) -> {ok, state(), actions()}. settle(QRef, Op, CTag, MsgIds, Ctxs) when ?QREF(QRef) -> - #ctx{state = State0, - module = Mod} = Ctx = get_ctx(QRef, Ctxs), - {State, Actions} = Mod:settle(Op, CTag, MsgIds, State0), - {ok, set_ctx(QRef, Ctx#ctx{state = State}, Ctxs), Actions}. + case get_ctx(QRef, Ctxs, undefined) of + undefined -> + %% if we receive a settlement and there is no queue state it means + %% the queue was deleted with active consumers + {ok, Ctxs, []}; + #ctx{state = State0, + module = Mod} = Ctx -> + {State, Actions} = Mod:settle(Op, CTag, MsgIds, State0), + {ok, set_ctx(QRef, Ctx#ctx{state = State}, Ctxs), Actions} + end. -spec credit(amqqueue:amqqueue() | queue_ref(), rabbit_types:ctag(), non_neg_integer(), @@ -496,13 +496,6 @@ dequeue(Q, NoAck, LimiterPid, CTag, Ctxs) -> Err end. -%% temporary -with(QRef, Fun, Ctxs) -> - #ctx{state = State0} = Ctx = get_ctx(QRef, Ctxs), - {Res, State} = Fun(State0), - {Res, set_ctx(QRef, Ctx#ctx{state = State}, Ctxs)}. - - get_ctx(Q, #?STATE{ctxs = Contexts}) when ?is_amqqueue(Q) -> Ref = qref(Q), case Contexts of @@ -561,7 +554,7 @@ return_ok(State0, Actions0) -> {S0, A0}; #{Pid := _} -> %% TODO: allow multiple Qrefs to monitor the same pid - exit(return_ok_duplicate_montored_pid); + exit(return_ok_duplicate_monitored_pid); _ -> _ = erlang:monitor(process, Pid), M = M0#{Pid => QRef}, diff --git a/test/queue_type_SUITE.erl b/test/queue_type_SUITE.erl index b7d132e737..aed5ad4ccb 100644 --- a/test/queue_type_SUITE.erl +++ b/test/queue_type_SUITE.erl @@ -22,7 +22,8 @@ all() -> all_tests() -> [ - smoke + smoke, + ack_after_queue_delete ]. groups() -> @@ -171,7 +172,44 @@ smoke(Config) -> basic_ack(Ch, basic_get(Ch, QName)), ok. +ack_after_queue_delete(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QName = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, + ?config(queue_type, Config)}])), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, QName, <<"msg1">>), + ct:pal("waiting for confirms from ~s", [QName]), + ok = receive + #'basic.ack'{} -> ok; + #'basic.nack'{} -> + ct:fail("confirm nack - expected ack") + after 2500 -> + flush(), + exit(confirm_timeout) + end, + + DTag = basic_get(Ch, QName), + + ChRef = erlang:monitor(process, Ch), + #'queue.delete_ok'{} = delete(Ch, QName), + + basic_ack(Ch, DTag), + %% assert no channel error + receive + {'DOWN', ChRef, process, _, _} -> + ct:fail("unexpected channel closure") + after 1000 -> + ok + end, + flush(), + ok. + %% Utility +%% delete_queues() -> [rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) || Q <- rabbit_amqqueue:list()]. @@ -182,6 +220,9 @@ declare(Ch, Q, Args) -> auto_delete = false, arguments = Args}). +delete(Ch, Q) -> + amqp_channel:call(Ch, #'queue.delete'{queue = Q}). + publish(Ch, Queue, Msg) -> ok = amqp_channel:cast(Ch, #'basic.publish'{routing_key = Queue}, |