summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2021-01-20 20:59:35 +0500
committerGitHub <noreply@github.com>2021-01-20 20:59:35 +0500
commit9ec4983f941a1306b1158c626dc5ea8f594e3bc9 (patch)
tree0772dcb3db7ce8d129498eebfd2a0cc65c214239
parente87581f14c5641d15889f8f67769829b456134af (diff)
parent6d1f3a160be772e1d53ab431f6b47540d912141c (diff)
downloadrabbitmq-server-git-9ec4983f941a1306b1158c626dc5ea8f594e3bc9.tar.gz
Merge pull request #2728 from rabbitmq/qq-bug-fix-service-queue-conv
Fix crash bug in QQ state conversion
-rw-r--r--deps/rabbit/src/rabbit_fifo.erl6
-rw-r--r--deps/rabbit/test/rabbit_fifo_SUITE.erl26
2 files changed, 31 insertions, 1 deletions
diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl
index 9205f897ca..37aece9d6c 100644
--- a/deps/rabbit/src/rabbit_fifo.erl
+++ b/deps/rabbit/src/rabbit_fifo.erl
@@ -512,6 +512,10 @@ apply(Meta, #update_config{config = Conf}, State) ->
checkout(Meta, State, update_config(Conf, State), []);
apply(_Meta, {machine_version, 0, 1}, V0State) ->
State = convert_v0_to_v1(V0State),
+ {State, ok, []};
+apply(_Meta, Cmd, State) ->
+ %% handle unhandled commands gracefully
+ rabbit_log:debug("rabbit_fifo: unhandled command ~W", [Cmd, 10]),
{State, ok, []}.
convert_v0_to_v1(V0State0) ->
@@ -532,7 +536,7 @@ convert_v0_to_v1(V0State0) ->
list_to_tuple(tuple_to_list(C0) ++ [0])
end, V0Cons),
V0SQ = rabbit_fifo_v0:get_field(service_queue, V0State),
- V1SQ = priority_queue:from_list(queue:to_list(V0SQ)),
+ V1SQ = priority_queue:from_list([{0, C} || C <- queue:to_list(V0SQ)]),
Cfg = #cfg{name = rabbit_fifo_v0:get_cfg_field(name, V0State),
resource = rabbit_fifo_v0:get_cfg_field(resource, V0State),
release_cursor_interval = rabbit_fifo_v0:get_cfg_field(release_cursor_interval, V0State),
diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl
index a9d541238d..1b8761f90e 100644
--- a/deps/rabbit/test/rabbit_fifo_SUITE.erl
+++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl
@@ -1518,6 +1518,32 @@ machine_version_test(_) ->
?assert(priority_queue:is_queue(S)),
ok.
+machine_version_waiting_consumer_test(_) ->
+ V0 = rabbit_fifo_v0,
+ S0 = V0:init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>)}),
+ Idx = 1,
+ {#rabbit_fifo{}, ok, []} = apply(meta(Idx), {machine_version, 0, 1}, S0),
+
+ Cid = {atom_to_binary(?FUNCTION_NAME, utf8), self()},
+ Entries = [
+ {1, rabbit_fifo_v0:make_enqueue(self(), 1, banana)},
+ {2, rabbit_fifo_v0:make_enqueue(self(), 2, apple)},
+ {3, rabbit_fifo_v0:make_checkout(Cid, {auto, 5, unsettled}, #{})}
+ ],
+ {S1, _Effects} = rabbit_fifo_v0_SUITE:run_log(S0, Entries),
+ Self = self(),
+ {#rabbit_fifo{enqueuers = #{Self := #enqueuer{}},
+ consumers = #{Cid := #consumer{priority = 0}},
+ service_queue = S,
+ messages = Msgs}, ok, []} = apply(meta(Idx),
+ {machine_version, 0, 1}, S1),
+ %% validate message conversion to lqueue
+ ?assertEqual(0, lqueue:len(Msgs)),
+ ?assert(priority_queue:is_queue(S)),
+ ?assertEqual(1, priority_queue:len(S)),
+ ok.
+
queue_ttl_test(_) ->
QName = rabbit_misc:r(<<"/">>, queue, <<"test">>),
Conf = #{name => ?FUNCTION_NAME,