summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-11-03 16:54:28 +0100
committerGitHub <noreply@github.com>2020-11-03 16:54:28 +0100
commit5a15c6c520b035ad2fada10d5af147c31e0102da (patch)
treec9a63381b27a21186a1f3919b0c380c3cfc052f4
parent3d2487323d048397a9e89af9acbdf8ff7489af5c (diff)
parentedd4d75a9b832f6148f1f3fe1a239afe59450a18 (diff)
downloadrabbitmq-server-git-5a15c6c520b035ad2fada10d5af147c31e0102da.tar.gz
Merge pull request #2492 from rabbitmq/ack-after-queue-delete
Avoid crash when acking deleted queue
-rw-r--r--src/rabbit_channel.erl1
-rw-r--r--src/rabbit_queue_type.erl39
-rw-r--r--test/queue_type_SUITE.erl43
3 files changed, 58 insertions, 25 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index fb9a428a0e..119c6dd8e9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1472,7 +1472,6 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
QName = amqqueue:get_name(Q),
ConsumerMapping1 = maps:remove(ConsumerTag, ConsumerMapping),
- % QRef = qpid_to_ref(QPid),
QCons1 =
case maps:find(QName, QCons) of
error -> QCons;
diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl
index 9e3a521206..569daa7bb2 100644
--- a/src/rabbit_queue_type.erl
+++ b/src/rabbit_queue_type.erl
@@ -36,9 +36,6 @@
is_server_named_allowed/1
]).
-%% temporary
--export([with/3]).
-
%% gah what is a good identity of a classic queue including all replicas
-type queue_name() :: rabbit_types:r(queue).
-type queue_ref() :: queue_name() | atom().
@@ -249,12 +246,9 @@ stat(Q) ->
remove(QRef, #?STATE{ctxs = Ctxs0} = State) ->
case maps:take(QRef, Ctxs0) of
error ->
- State#?STATE{ctxs = Ctxs0};
+ State;
{_, Ctxs} ->
- %% remove all linked queue refs
- State#?STATE{ctxs = maps:filter(fun (_, V) ->
- V == QRef
- end, Ctxs)}
+ State#?STATE{ctxs = Ctxs}
end.
-spec info(amqqueue:amqqueue(), all_keys | rabbit_types:info_keys()) ->
@@ -307,7 +301,7 @@ is_policy_applicable(Q, Policy) ->
is_server_named_allowed(Type) ->
Capabilities = Type:capabilities(),
maps:get(server_named, Capabilities, false).
-
+
-spec init() -> state().
init() ->
#?STATE{}.
@@ -329,9 +323,9 @@ new(Q, State) when ?is_amqqueue(Q) ->
-spec consume(amqqueue:amqqueue(), consume_spec(), state()) ->
{ok, state(), actions()} | {error, term()}.
consume(Q, Spec, State) ->
- #ctx{state = State0} = Ctx = get_ctx(Q, State),
+ #ctx{state = CtxState0} = Ctx = get_ctx(Q, State),
Mod = amqqueue:get_type(Q),
- case Mod:consume(Q, Spec, State0) of
+ case Mod:consume(Q, Spec, CtxState0) of
{ok, CtxState, Actions} ->
return_ok(set_ctx(Q, Ctx#ctx{state = CtxState}, State), Actions);
Err ->
@@ -466,10 +460,16 @@ deliver(Qs, Delivery, #?STATE{} = State0) ->
[non_neg_integer()], state()) -> {ok, state(), actions()}.
settle(QRef, Op, CTag, MsgIds, Ctxs)
when ?QREF(QRef) ->
- #ctx{state = State0,
- module = Mod} = Ctx = get_ctx(QRef, Ctxs),
- {State, Actions} = Mod:settle(Op, CTag, MsgIds, State0),
- {ok, set_ctx(QRef, Ctx#ctx{state = State}, Ctxs), Actions}.
+ case get_ctx(QRef, Ctxs, undefined) of
+ undefined ->
+ %% if we receive a settlement and there is no queue state it means
+ %% the queue was deleted with active consumers
+ {ok, Ctxs, []};
+ #ctx{state = State0,
+ module = Mod} = Ctx ->
+ {State, Actions} = Mod:settle(Op, CTag, MsgIds, State0),
+ {ok, set_ctx(QRef, Ctx#ctx{state = State}, Ctxs), Actions}
+ end.
-spec credit(amqqueue:amqqueue() | queue_ref(),
rabbit_types:ctag(), non_neg_integer(),
@@ -496,13 +496,6 @@ dequeue(Q, NoAck, LimiterPid, CTag, Ctxs) ->
Err
end.
-%% temporary
-with(QRef, Fun, Ctxs) ->
- #ctx{state = State0} = Ctx = get_ctx(QRef, Ctxs),
- {Res, State} = Fun(State0),
- {Res, set_ctx(QRef, Ctx#ctx{state = State}, Ctxs)}.
-
-
get_ctx(Q, #?STATE{ctxs = Contexts}) when ?is_amqqueue(Q) ->
Ref = qref(Q),
case Contexts of
@@ -561,7 +554,7 @@ return_ok(State0, Actions0) ->
{S0, A0};
#{Pid := _} ->
%% TODO: allow multiple Qrefs to monitor the same pid
- exit(return_ok_duplicate_montored_pid);
+ exit(return_ok_duplicate_monitored_pid);
_ ->
_ = erlang:monitor(process, Pid),
M = M0#{Pid => QRef},
diff --git a/test/queue_type_SUITE.erl b/test/queue_type_SUITE.erl
index b7d132e737..aed5ad4ccb 100644
--- a/test/queue_type_SUITE.erl
+++ b/test/queue_type_SUITE.erl
@@ -22,7 +22,8 @@ all() ->
all_tests() ->
[
- smoke
+ smoke,
+ ack_after_queue_delete
].
groups() ->
@@ -171,7 +172,44 @@ smoke(Config) ->
basic_ack(Ch, basic_get(Ch, QName)),
ok.
+ack_after_queue_delete(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QName = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QName, 0, 0},
+ declare(Ch, QName, [{<<"x-queue-type">>, longstr,
+ ?config(queue_type, Config)}])),
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ publish(Ch, QName, <<"msg1">>),
+ ct:pal("waiting for confirms from ~s", [QName]),
+ ok = receive
+ #'basic.ack'{} -> ok;
+ #'basic.nack'{} ->
+ ct:fail("confirm nack - expected ack")
+ after 2500 ->
+ flush(),
+ exit(confirm_timeout)
+ end,
+
+ DTag = basic_get(Ch, QName),
+
+ ChRef = erlang:monitor(process, Ch),
+ #'queue.delete_ok'{} = delete(Ch, QName),
+
+ basic_ack(Ch, DTag),
+ %% assert no channel error
+ receive
+ {'DOWN', ChRef, process, _, _} ->
+ ct:fail("unexpected channel closure")
+ after 1000 ->
+ ok
+ end,
+ flush(),
+ ok.
+
%% Utility
+%%
delete_queues() ->
[rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
|| Q <- rabbit_amqqueue:list()].
@@ -182,6 +220,9 @@ declare(Ch, Q, Args) ->
auto_delete = false,
arguments = Args}).
+delete(Ch, Q) ->
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q}).
+
publish(Ch, Queue, Msg) ->
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = Queue},