diff options
Diffstat (limited to 'deps/rabbit/test/rabbit_fifo_v0_SUITE.erl')
-rw-r--r-- | deps/rabbit/test/rabbit_fifo_v0_SUITE.erl | 1392 |
1 files changed, 1392 insertions, 0 deletions
diff --git a/deps/rabbit/test/rabbit_fifo_v0_SUITE.erl b/deps/rabbit/test/rabbit_fifo_v0_SUITE.erl new file mode 100644 index 0000000000..fcb84377de --- /dev/null +++ b/deps/rabbit/test/rabbit_fifo_v0_SUITE.erl @@ -0,0 +1,1392 @@ +-module(rabbit_fifo_v0_SUITE). + +%% rabbit_fifo unit tests suite + +-compile(export_all). + +-compile({no_auto_import, [apply/3]}). +-export([ + ]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). +-include("src/rabbit_fifo_v0.hrl"). + +%%%=================================================================== +%%% Common Test callbacks +%%%=================================================================== + +all() -> + [ + {group, tests} + ]. + + +%% replicate eunit like test resultion +all_tests() -> + [F || {F, _} <- ?MODULE:module_info(functions), + re:run(atom_to_list(F), "_test$") /= nomatch]. + +groups() -> + [ + {tests, [], all_tests()} + ]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + ok. + +%%%=================================================================== +%%% Test cases +%%%=================================================================== + +-define(ASSERT_EFF(EfxPat, Effects), + ?ASSERT_EFF(EfxPat, true, Effects)). + +-define(ASSERT_EFF(EfxPat, Guard, Effects), + ?assert(lists:any(fun (EfxPat) when Guard -> true; + (_) -> false + end, Effects))). + +-define(ASSERT_NO_EFF(EfxPat, Effects), + ?assert(not lists:any(fun (EfxPat) -> true; + (_) -> false + end, Effects))). + +-define(assertNoEffect(EfxPat, Effects), + ?assert(not lists:any(fun (EfxPat) -> true; + (_) -> false + end, Effects))). + +test_init(Name) -> + init(#{name => Name, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(Name, utf8)), + release_cursor_interval => 0}). + +enq_enq_checkout_test(_) -> + Cid = {<<"enq_enq_checkout_test">>, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + {_State3, _, Effects} = + apply(meta(3), + rabbit_fifo_v0:make_checkout(Cid, {once, 2, simple_prefetch}, #{}), + State2), + ?ASSERT_EFF({monitor, _, _}, Effects), + ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, Effects), + ok. + +credit_enq_enq_checkout_settled_credit_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + {State3, _, Effects} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {auto, 1, credited}, #{}), State2), + ?ASSERT_EFF({monitor, _, _}, Effects), + Deliveries = lists:filter(fun ({send_msg, _, {delivery, _, _}, _}) -> true; + (_) -> false + end, Effects), + ?assertEqual(1, length(Deliveries)), + %% settle the delivery this should _not_ result in further messages being + %% delivered + {State4, SettledEffects} = settle(Cid, 4, 1, State3), + ?assertEqual(false, lists:any(fun ({send_msg, _, {delivery, _, _}, _}) -> + true; + (_) -> false + end, SettledEffects)), + %% granting credit (3) should deliver the second msg if the receivers + %% delivery count is (1) + {State5, CreditEffects} = credit(Cid, 5, 1, 1, false, State4), + % ?debugFmt("CreditEffects ~p ~n~p", [CreditEffects, State4]), + ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, CreditEffects), + {_State6, FinalEffects} = enq(6, 3, third, State5), + ?assertEqual(false, lists:any(fun ({send_msg, _, {delivery, _, _}, _}) -> + true; + (_) -> false + end, FinalEffects)), + ok. + +credit_with_drained_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + State0 = test_init(test), + %% checkout with a single credit + {State1, _, _} = + apply(meta(1), rabbit_fifo_v0:make_checkout(Cid, {auto, 1, credited},#{}), + State0), + ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 1, + delivery_count = 0}}}, + State1), + {State, Result, _} = + apply(meta(3), rabbit_fifo_v0:make_credit(Cid, 0, 5, true), State1), + ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 0, + delivery_count = 5}}}, + State), + ?assertEqual({multi, [{send_credit_reply, 0}, + {send_drained, {?FUNCTION_NAME, 5}}]}, + Result), + ok. + +credit_and_drain_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + %% checkout without any initial credit (like AMQP 1.0 would) + {State3, _, CheckEffs} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {auto, 0, credited}, #{}), + State2), + + ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, CheckEffs), + {State4, {multi, [{send_credit_reply, 0}, + {send_drained, {?FUNCTION_NAME, 2}}]}, + Effects} = apply(meta(4), rabbit_fifo_v0:make_credit(Cid, 4, 0, true), State3), + ?assertMatch(#?STATE{consumers = #{Cid := #consumer{credit = 0, + delivery_count = 4}}}, + State4), + + ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}, + {_, {_, second}}]}, _}, Effects), + {_State5, EnqEffs} = enq(5, 2, third, State4), + ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, EnqEffs), + ok. + + + +enq_enq_deq_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + % get returns a reply value + NumReady = 1, + {_State3, {dequeue, {0, {_, first}}, NumReady}, [{monitor, _, _}]} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), + State2), + ok. + +enq_enq_deq_deq_settle_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + % get returns a reply value + {State3, {dequeue, {0, {_, first}}, 1}, [{monitor, _, _}]} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), + State2), + {_State4, {dequeue, empty}} = + apply(meta(4), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), + State3), + ok. + +enq_enq_checkout_get_settled_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + % get returns a reply value + {_State2, {dequeue, {0, {_, first}}, _}, _Effs} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), + State1), + ok. + +checkout_get_empty_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + State = test_init(test), + {_State2, {dequeue, empty}} = + apply(meta(1), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), State), + ok. + +untracked_enq_deq_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + State0 = test_init(test), + {State1, _, _} = apply(meta(1), + rabbit_fifo_v0:make_enqueue(undefined, undefined, first), + State0), + {_State2, {dequeue, {0, {_, first}}, _}, _} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State1), + ok. + +release_cursor_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, _} = enq(2, 2, second, State1), + {State3, _} = check(Cid, 3, 10, State2), + % no release cursor effect at this point + {State4, _} = settle(Cid, 4, 1, State3), + {_Final, Effects1} = settle(Cid, 5, 0, State4), + % empty queue forwards release cursor all the way + ?ASSERT_EFF({release_cursor, 5, _}, Effects1), + ok. + +checkout_enq_settle_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, [{monitor, _, _} | _]} = check(Cid, 1, test_init(test)), + {State2, Effects0} = enq(2, 1, first, State1), + ?ASSERT_EFF({send_msg, _, + {delivery, ?FUNCTION_NAME, + [{0, {_, first}}]}, _}, + Effects0), + {State3, [_Inactive]} = enq(3, 2, second, State2), + {_, _Effects} = settle(Cid, 4, 0, State3), + % the release cursor is the smallest raft index that does not + % contribute to the state of the application + % ?ASSERT_EFF({release_cursor, 2, _}, Effects), + ok. + +out_of_order_enqueue_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), + {State2, Effects2} = enq(2, 1, first, State1), + ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2), + % assert monitor was set up + ?ASSERT_EFF({monitor, _, _}, Effects2), + % enqueue seq num 3 and 4 before 2 + {State3, Effects3} = enq(3, 3, third, State2), + ?assertNoEffect({send_msg, _, {delivery, _, _}, _}, Effects3), + {State4, Effects4} = enq(4, 4, fourth, State3), + % assert no further deliveries where made + ?assertNoEffect({send_msg, _, {delivery, _, _}, _}, Effects4), + {_State5, Effects5} = enq(5, 2, second, State4), + % assert two deliveries were now made + ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, second}}, + {_, {_, third}}, + {_, {_, fourth}}]}, _}, + Effects5), + ok. + +out_of_order_first_enqueue_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + {State1, _} = check_n(Cid, 5, 5, test_init(test)), + {_State2, Effects2} = enq(2, 10, first, State1), + ?ASSERT_EFF({monitor, process, _}, Effects2), + ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, + Effects2), + ok. + +duplicate_enqueue_test(_) -> + Cid = {<<"duplicate_enqueue_test">>, self()}, + {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)), + {State2, Effects2} = enq(2, 1, first, State1), + ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2), + {_State3, Effects3} = enq(3, 1, first, State2), + ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects3), + ok. + +return_test(_) -> + Cid = {<<"cid">>, self()}, + Cid2 = {<<"cid2">>, self()}, + {State0, _} = enq(1, 1, msg, test_init(test)), + {State1, _} = check_auto(Cid, 2, State0), + {State2, _} = check_auto(Cid2, 3, State1), + {State3, _, _} = apply(meta(4), rabbit_fifo_v0:make_return(Cid, [0]), State2), + ?assertMatch(#{Cid := #consumer{checked_out = C}} when map_size(C) == 0, + State3#?STATE.consumers), + ?assertMatch(#{Cid2 := #consumer{checked_out = C2}} when map_size(C2) == 1, + State3#?STATE.consumers), + ok. + +return_dequeue_delivery_limit_test(_) -> + Init = init(#{name => test, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(test, utf8)), + release_cursor_interval => 0, + delivery_limit => 1}), + {State0, _} = enq(1, 1, msg, Init), + + Cid = {<<"cid">>, self()}, + Cid2 = {<<"cid2">>, self()}, + + {State1, {MsgId1, _}} = deq(2, Cid, unsettled, State0), + {State2, _, _} = apply(meta(4), rabbit_fifo_v0:make_return(Cid, [MsgId1]), + State1), + + {State3, {MsgId2, _}} = deq(2, Cid2, unsettled, State2), + {State4, _, _} = apply(meta(4), rabbit_fifo_v0:make_return(Cid2, [MsgId2]), + State3), + ?assertMatch(#{num_messages := 0}, rabbit_fifo_v0:overview(State4)), + ok. + +return_non_existent_test(_) -> + Cid = {<<"cid">>, self()}, + {State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)), + % return non-existent + {_State2, _} = apply(meta(3), rabbit_fifo_v0:make_return(Cid, [99]), State0), + ok. + +return_checked_out_test(_) -> + Cid = {<<"cid">>, self()}, + {State0, [_, _]} = enq(1, 1, first, test_init(test)), + {State1, [_Monitor, + {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, + {aux, active} | _ ]} = check_auto(Cid, 2, State0), + % returning immediately checks out the same message again + {_, ok, [{send_msg, _, {delivery, _, [{_, _}]}, _}, + {aux, active}]} = + apply(meta(3), rabbit_fifo_v0:make_return(Cid, [MsgId]), State1), + ok. + +return_checked_out_limit_test(_) -> + Cid = {<<"cid">>, self()}, + Init = init(#{name => test, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(test, utf8)), + release_cursor_interval => 0, + delivery_limit => 1}), + {State0, [_, _]} = enq(1, 1, first, Init), + {State1, [_Monitor, + {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, + {aux, active} | _ ]} = check_auto(Cid, 2, State0), + % returning immediately checks out the same message again + {State2, ok, [{send_msg, _, {delivery, _, [{MsgId2, _}]}, _}, + {aux, active}]} = + apply(meta(3), rabbit_fifo_v0:make_return(Cid, [MsgId]), State1), + {#?STATE{ra_indexes = RaIdxs}, ok, [_ReleaseEff]} = + apply(meta(4), rabbit_fifo_v0:make_return(Cid, [MsgId2]), State2), + ?assertEqual(0, rabbit_fifo_index:size(RaIdxs)), + ok. + +return_auto_checked_out_test(_) -> + Cid = {<<"cid">>, self()}, + {State00, [_, _]} = enq(1, 1, first, test_init(test)), + {State0, [_]} = enq(2, 2, second, State00), + % it first active then inactive as the consumer took on but cannot take + % any more + {State1, [_Monitor, + {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, + {aux, active}, + {aux, inactive} + ]} = check_auto(Cid, 2, State0), + % return should include another delivery + {_State2, _, Effects} = apply(meta(3), rabbit_fifo_v0:make_return(Cid, [MsgId]), State1), + ?ASSERT_EFF({send_msg, _, + {delivery, _, [{_, {#{delivery_count := 1}, first}}]}, _}, + Effects), + ok. + +cancelled_checkout_out_test(_) -> + Cid = {<<"cid">>, self()}, + {State00, [_, _]} = enq(1, 1, first, test_init(test)), + {State0, [_]} = enq(2, 2, second, State00), + {State1, _} = check_auto(Cid, 2, State0), + % cancelled checkout should not return pending messages to queue + {State2, _, _} = apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, cancel, #{}), State1), + ?assertEqual(1, maps:size(State2#?STATE.messages)), + ?assertEqual(0, lqueue:len(State2#?STATE.returns)), + + {State3, {dequeue, empty}} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State2), + %% settle + {State4, ok, _} = + apply(meta(4), rabbit_fifo_v0:make_settle(Cid, [0]), State3), + + {_State, {dequeue, {_, {_, second}}, _}, _} = + apply(meta(5), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State4), + ok. + +down_with_noproc_consumer_returns_unsettled_test(_) -> + Cid = {<<"down_consumer_returns_unsettled_test">>, self()}, + {State0, [_, _]} = enq(1, 1, second, test_init(test)), + {State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0), + {State2, _, _} = apply(meta(3), {down, Pid, noproc}, State1), + {_State, Effects} = check(Cid, 4, State2), + ?ASSERT_EFF({monitor, process, _}, Effects), + ok. + +down_with_noconnection_marks_suspect_and_node_is_monitored_test(_) -> + Pid = spawn(fun() -> ok end), + Cid = {<<"down_with_noconnect">>, Pid}, + Self = self(), + Node = node(Pid), + {State0, Effects0} = enq(1, 1, second, test_init(test)), + ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0), + {State1, Effects1} = check_auto(Cid, 2, State0), + #consumer{credit = 0} = maps:get(Cid, State1#?STATE.consumers), + ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1), + % monitor both enqueuer and consumer + % because we received a noconnection we now need to monitor the node + {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), + #consumer{credit = 1, + checked_out = Ch, + status = suspected_down} = maps:get(Cid, State2a#?STATE.consumers), + ?assertEqual(#{}, Ch), + %% validate consumer has credit + {State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a), + ?ASSERT_EFF({monitor, node, _}, Effects2), + ?assertNoEffect({demonitor, process, _}, Effects2), + % when the node comes up we need to retry the process monitors for the + % disconnected processes + {State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2), + #consumer{status = up} = maps:get(Cid, State3#?STATE.consumers), + % try to re-monitor the suspect processes + ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3), + ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3), + ok. + +down_with_noconnection_returns_unack_test(_) -> + Pid = spawn(fun() -> ok end), + Cid = {<<"down_with_noconnect">>, Pid}, + {State0, _} = enq(1, 1, second, test_init(test)), + ?assertEqual(1, maps:size(State0#?STATE.messages)), + ?assertEqual(0, lqueue:len(State0#?STATE.returns)), + {State1, {_, _}} = deq(2, Cid, unsettled, State0), + ?assertEqual(0, maps:size(State1#?STATE.messages)), + ?assertEqual(0, lqueue:len(State1#?STATE.returns)), + {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), + ?assertEqual(0, maps:size(State2a#?STATE.messages)), + ?assertEqual(1, lqueue:len(State2a#?STATE.returns)), + ?assertMatch(#consumer{checked_out = Ch, + status = suspected_down} + when map_size(Ch) == 0, + maps:get(Cid, State2a#?STATE.consumers)), + ok. + +down_with_noproc_enqueuer_is_cleaned_up_test(_) -> + State00 = test_init(test), + Pid = spawn(fun() -> ok end), + {State0, _, Effects0} = apply(meta(1), rabbit_fifo_v0:make_enqueue(Pid, 1, first), State00), + ?ASSERT_EFF({monitor, process, _}, Effects0), + {State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0), + % ensure there are no enqueuers + ?assert(0 =:= maps:size(State1#?STATE.enqueuers)), + ok. + +discarded_message_without_dead_letter_handler_is_removed_test(_) -> + Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()}, + {State0, [_, _]} = enq(1, 1, first, test_init(test)), + {State1, Effects1} = check_n(Cid, 2, 10, State0), + ?ASSERT_EFF({send_msg, _, + {delivery, _, [{0, {_, first}}]}, _}, + Effects1), + {_State2, _, Effects2} = apply(meta(1), + rabbit_fifo_v0:make_discard(Cid, [0]), State1), + ?assertNoEffect({send_msg, _, + {delivery, _, [{0, {_, first}}]}, _}, + Effects2), + ok. + +discarded_message_with_dead_letter_handler_emits_log_effect_test(_) -> + Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()}, + State00 = init(#{name => test, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), + dead_letter_handler => + {somemod, somefun, [somearg]}}), + {State0, [_, _]} = enq(1, 1, first, State00), + {State1, Effects1} = check_n(Cid, 2, 10, State0), + ?ASSERT_EFF({send_msg, _, + {delivery, _, [{0, {_, first}}]}, _}, + Effects1), + {_State2, _, Effects2} = apply(meta(1), rabbit_fifo_v0:make_discard(Cid, [0]), State1), + % assert mod call effect with appended reason and message + ?ASSERT_EFF({log, _RaftIdxs, _}, Effects2), + ok. + +tick_test(_) -> + Cid = {<<"c">>, self()}, + Cid2 = {<<"c2">>, self()}, + {S0, _} = enq(1, 1, <<"fst">>, test_init(?FUNCTION_NAME)), + {S1, _} = enq(2, 2, <<"snd">>, S0), + {S2, {MsgId, _}} = deq(3, Cid, unsettled, S1), + {S3, {_, _}} = deq(4, Cid2, unsettled, S2), + {S4, _, _} = apply(meta(5), rabbit_fifo_v0:make_return(Cid, [MsgId]), S3), + + [{mod_call, rabbit_quorum_queue, handle_tick, + [#resource{}, + {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}, + [_Node] + ]}] = rabbit_fifo_v0:tick(1, S4), + ok. + + +delivery_query_returns_deliveries_test(_) -> + Tag = atom_to_binary(?FUNCTION_NAME, utf8), + Cid = {Tag, self()}, + Commands = [ + rabbit_fifo_v0:make_checkout(Cid, {auto, 5, simple_prefetch}, #{}), + rabbit_fifo_v0:make_enqueue(self(), 1, one), + rabbit_fifo_v0:make_enqueue(self(), 2, two), + rabbit_fifo_v0:make_enqueue(self(), 3, tre), + rabbit_fifo_v0:make_enqueue(self(), 4, for) + ], + Indexes = lists:seq(1, length(Commands)), + Entries = lists:zip(Indexes, Commands), + {State, _Effects} = run_log(test_init(help), Entries), + % 3 deliveries are returned + [{0, {_, one}}] = rabbit_fifo_v0:get_checked_out(Cid, 0, 0, State), + [_, _, _] = rabbit_fifo_v0:get_checked_out(Cid, 1, 3, State), + ok. + +pending_enqueue_is_enqueued_on_down_test(_) -> + Cid = {<<"cid">>, self()}, + Pid = self(), + {State0, _} = enq(1, 2, first, test_init(test)), + {State1, _, _} = apply(meta(2), {down, Pid, noproc}, State0), + {_State2, {dequeue, {0, {_, first}}, 0}, _} = + apply(meta(3), rabbit_fifo_v0:make_checkout(Cid, {dequeue, settled}, #{}), State1), + ok. + +duplicate_delivery_test(_) -> + {State0, _} = enq(1, 1, first, test_init(test)), + {#?STATE{ra_indexes = RaIdxs, + messages = Messages}, _} = enq(2, 1, first, State0), + ?assertEqual(1, rabbit_fifo_index:size(RaIdxs)), + ?assertEqual(1, maps:size(Messages)), + ok. + +state_enter_file_handle_leader_reservation_test(_) -> + S0 = init(#{name => the_name, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), + become_leader_handler => {m, f, [a]}}), + + Resource = {resource, <<"/">>, queue, <<"test">>}, + Effects = rabbit_fifo_v0:state_enter(leader, S0), + ?assertEqual([ + {mod_call, m, f, [a, the_name]}, + {mod_call, rabbit_quorum_queue, file_handle_leader_reservation, [Resource]} + ], Effects), + ok. + +state_enter_file_handle_other_reservation_test(_) -> + S0 = init(#{name => the_name, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>)}), + Effects = rabbit_fifo_v0:state_enter(other, S0), + ?assertEqual([ + {mod_call, rabbit_quorum_queue, file_handle_other_reservation, []} + ], + Effects), + ok. + +state_enter_monitors_and_notifications_test(_) -> + Oth = spawn(fun () -> ok end), + {State0, _} = enq(1, 1, first, test_init(test)), + Cid = {<<"adf">>, self()}, + OthCid = {<<"oth">>, Oth}, + {State1, _} = check(Cid, 2, State0), + {State, _} = check(OthCid, 3, State1), + Self = self(), + Effects = rabbit_fifo_v0:state_enter(leader, State), + + %% monitor all enqueuers and consumers + [{monitor, process, Self}, + {monitor, process, Oth}] = + lists:filter(fun ({monitor, process, _}) -> true; + (_) -> false + end, Effects), + [{send_msg, Self, leader_change, ra_event}, + {send_msg, Oth, leader_change, ra_event}] = + lists:filter(fun ({send_msg, _, leader_change, ra_event}) -> true; + (_) -> false + end, Effects), + ?ASSERT_EFF({monitor, process, _}, Effects), + ok. + +purge_test(_) -> + Cid = {<<"purge_test">>, self()}, + {State1, _} = enq(1, 1, first, test_init(test)), + {State2, {purge, 1}, _} = apply(meta(2), rabbit_fifo_v0:make_purge(), State1), + {State3, _} = enq(3, 2, second, State2), + % get returns a reply value + {_State4, {dequeue, {0, {_, second}}, _}, [{monitor, _, _}]} = + apply(meta(4), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), State3), + ok. + +purge_with_checkout_test(_) -> + Cid = {<<"purge_test">>, self()}, + {State0, _} = check_auto(Cid, 1, test_init(?FUNCTION_NAME)), + {State1, _} = enq(2, 1, <<"first">>, State0), + {State2, _} = enq(3, 2, <<"second">>, State1), + %% assert message bytes are non zero + ?assert(State2#?STATE.msg_bytes_checkout > 0), + ?assert(State2#?STATE.msg_bytes_enqueue > 0), + {State3, {purge, 1}, _} = apply(meta(2), rabbit_fifo_v0:make_purge(), State2), + ?assert(State2#?STATE.msg_bytes_checkout > 0), + ?assertEqual(0, State3#?STATE.msg_bytes_enqueue), + ?assertEqual(1, rabbit_fifo_index:size(State3#?STATE.ra_indexes)), + #consumer{checked_out = Checked} = maps:get(Cid, State3#?STATE.consumers), + ?assertEqual(1, maps:size(Checked)), + ok. + +down_noproc_returns_checked_out_in_order_test(_) -> + S0 = test_init(?FUNCTION_NAME), + %% enqueue 100 + S1 = lists:foldl(fun (Num, FS0) -> + {FS, _} = enq(Num, Num, Num, FS0), + FS + end, S0, lists:seq(1, 100)), + ?assertEqual(100, maps:size(S1#?STATE.messages)), + Cid = {<<"cid">>, self()}, + {S2, _} = check(Cid, 101, 1000, S1), + #consumer{checked_out = Checked} = maps:get(Cid, S2#?STATE.consumers), + ?assertEqual(100, maps:size(Checked)), + %% simulate down + {S, _, _} = apply(meta(102), {down, self(), noproc}, S2), + Returns = lqueue:to_list(S#?STATE.returns), + ?assertEqual(100, length(Returns)), + ?assertEqual(0, maps:size(S#?STATE.consumers)), + %% validate returns are in order + ?assertEqual(lists:sort(Returns), Returns), + ok. + +down_noconnection_returns_checked_out_test(_) -> + S0 = test_init(?FUNCTION_NAME), + NumMsgs = 20, + S1 = lists:foldl(fun (Num, FS0) -> + {FS, _} = enq(Num, Num, Num, FS0), + FS + end, S0, lists:seq(1, NumMsgs)), + ?assertEqual(NumMsgs, maps:size(S1#?STATE.messages)), + Cid = {<<"cid">>, self()}, + {S2, _} = check(Cid, 101, 1000, S1), + #consumer{checked_out = Checked} = maps:get(Cid, S2#?STATE.consumers), + ?assertEqual(NumMsgs, maps:size(Checked)), + %% simulate down + {S, _, _} = apply(meta(102), {down, self(), noconnection}, S2), + Returns = lqueue:to_list(S#?STATE.returns), + ?assertEqual(NumMsgs, length(Returns)), + ?assertMatch(#consumer{checked_out = Ch} + when map_size(Ch) == 0, + maps:get(Cid, S#?STATE.consumers)), + %% validate returns are in order + ?assertEqual(lists:sort(Returns), Returns), + ok. + +single_active_consumer_basic_get_test(_) -> + Cid = {?FUNCTION_NAME, self()}, + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + ?assertEqual(single_active, State0#?STATE.cfg#cfg.consumer_strategy), + ?assertEqual(0, map_size(State0#?STATE.consumers)), + {State1, _} = enq(1, 1, first, State0), + {_State, {error, unsupported}} = + apply(meta(2), rabbit_fifo_v0:make_checkout(Cid, {dequeue, unsettled}, #{}), + State1), + ok. + +single_active_consumer_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + ?assertEqual(single_active, State0#?STATE.cfg#cfg.consumer_strategy), + ?assertEqual(0, map_size(State0#?STATE.consumers)), + + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + meta(1), + make_checkout({CTag, self()}, + {once, 1, simple_prefetch}, + #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + C1 = {<<"ctag1">>, self()}, + C2 = {<<"ctag2">>, self()}, + C3 = {<<"ctag3">>, self()}, + C4 = {<<"ctag4">>, self()}, + + % the first registered consumer is the active one, the others are waiting + ?assertEqual(1, map_size(State1#?STATE.consumers)), + ?assertMatch(#{C1 := _}, State1#?STATE.consumers), + ?assertEqual(3, length(State1#?STATE.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C2, 1, State1#?STATE.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C3, 1, State1#?STATE.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C4, 1, State1#?STATE.waiting_consumers)), + + % cancelling a waiting consumer + {State2, _, Effects1} = apply(meta(2), + make_checkout(C3, cancel, #{}), + State1), + % the active consumer should still be in place + ?assertEqual(1, map_size(State2#?STATE.consumers)), + ?assertMatch(#{C1 := _}, State2#?STATE.consumers), + % the cancelled consumer has been removed from waiting consumers + ?assertEqual(2, length(State2#?STATE.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C2, 1, State2#?STATE.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C4, 1, State2#?STATE.waiting_consumers)), + % there are some effects to unregister the consumer + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C3, Effects1), + + % cancelling the active consumer + {State3, _, Effects2} = apply(meta(3), + make_checkout(C1, cancel, #{}), + State2), + % the second registered consumer is now the active one + ?assertEqual(1, map_size(State3#?STATE.consumers)), + ?assertMatch(#{C2 := _}, State3#?STATE.consumers), + % the new active consumer is no longer in the waiting list + ?assertEqual(1, length(State3#?STATE.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C4, 1, + State3#?STATE.waiting_consumers)), + %% should have a cancel consumer handler mod_call effect and + %% an active new consumer effect + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C1, Effects2), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects2), + + % cancelling the active consumer + {State4, _, Effects3} = apply(meta(4), + make_checkout(C2, cancel, #{}), + State3), + % the last waiting consumer became the active one + ?assertEqual(1, map_size(State4#?STATE.consumers)), + ?assertMatch(#{C4 := _}, State4#?STATE.consumers), + % the waiting consumer list is now empty + ?assertEqual(0, length(State4#?STATE.waiting_consumers)), + % there are some effects to unregister the consumer and + % to update the new active one (metrics) + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C2, Effects3), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects3), + + % cancelling the last consumer + {State5, _, Effects4} = apply(meta(5), + make_checkout(C4, cancel, #{}), + State4), + % no active consumer anymore + ?assertEqual(0, map_size(State5#?STATE.consumers)), + % still nothing in the waiting list + ?assertEqual(0, length(State5#?STATE.waiting_consumers)), + % there is an effect to unregister the consumer + queue inactive effect + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, _}, Effects4), + + ok. + +single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + [C1, C2, C3, C4] = Consumers = + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, + {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}], + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + #{index => 1}, + make_checkout({CTag, ChannelId}, {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, Consumers), + + % the channel of the active consumer goes down + {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, noproc}, State1), + % fell back to another consumer + ?assertEqual(1, map_size(State2#?STATE.consumers)), + % there are still waiting consumers + ?assertEqual(2, length(State2#?STATE.waiting_consumers)), + % effects to unregister the consumer and + % to update the new active one (metrics) are there + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C1, Effects), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects), + + % the channel of the active consumer and a waiting consumer goes down + {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, noproc}, State2), + % fell back to another consumer + ?assertEqual(1, map_size(State3#?STATE.consumers)), + % no more waiting consumer + ?assertEqual(0, length(State3#?STATE.waiting_consumers)), + % effects to cancel both consumers of this channel + effect to update the new active one (metrics) + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C2, Effects2), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C3, Effects2), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects2), + + % the last channel goes down + {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3), + % no more consumers + ?assertEqual(0, map_size(State4#?STATE.consumers)), + ?assertEqual(0, length(State4#?STATE.waiting_consumers)), + % there is an effect to unregister the consumer + queue inactive effect + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C4, Effects3), + + ok. + +single_active_returns_messages_on_noconnection_test(_) -> + R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => R, + release_cursor_interval => 0, + single_active_consumer_on => true}), + Meta = #{index => 1}, + Nodes = [n1], + ConsumerIds = [{_, DownPid}] = + [begin + B = atom_to_binary(N, utf8), + {<<"ctag_", B/binary>>, + test_util:fake_pid(N)} + end || N <- Nodes], + % adding some consumers + State1 = lists:foldl( + fun(CId, Acc0) -> + {Acc, _, _} = + apply(Meta, + make_checkout(CId, + {once, 1, simple_prefetch}, #{}), + Acc0), + Acc + end, State0, ConsumerIds), + {State2, _} = enq(4, 1, msg1, State1), + % simulate node goes down + {State3, _, _} = apply(meta(5), {down, DownPid, noconnection}, State2), + %% assert the consumer is up + ?assertMatch([_], lqueue:to_list(State3#?STATE.returns)), + ?assertMatch([{_, #consumer{checked_out = Checked}}] + when map_size(Checked) == 0, + State3#?STATE.waiting_consumers), + + ok. + +single_active_consumer_replaces_consumer_when_down_noconnection_test(_) -> + R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => R, + release_cursor_interval => 0, + single_active_consumer_on => true}), + Meta = #{index => 1}, + Nodes = [n1, n2, node()], + ConsumerIds = [C1 = {_, DownPid}, C2, _C3] = + [begin + B = atom_to_binary(N, utf8), + {<<"ctag_", B/binary>>, + test_util:fake_pid(N)} + end || N <- Nodes], + % adding some consumers + State1a = lists:foldl( + fun(CId, Acc0) -> + {Acc, _, _} = + apply(Meta, + make_checkout(CId, + {once, 1, simple_prefetch}, #{}), + Acc0), + Acc + end, State0, ConsumerIds), + + %% assert the consumer is up + ?assertMatch(#{C1 := #consumer{status = up}}, + State1a#?STATE.consumers), + + {State1, _} = enq(10, 1, msg, State1a), + + % simulate node goes down + {State2, _, _} = apply(meta(5), {down, DownPid, noconnection}, State1), + + %% assert a new consumer is in place and it is up + ?assertMatch([{C2, #consumer{status = up, + checked_out = Ch}}] + when map_size(Ch) == 1, + maps:to_list(State2#?STATE.consumers)), + + %% the disconnected consumer has been returned to waiting + ?assert(lists:any(fun ({C,_}) -> C =:= C1 end, + State2#?STATE.waiting_consumers)), + ?assertEqual(2, length(State2#?STATE.waiting_consumers)), + + % simulate node comes back up + {State3, _, _} = apply(#{index => 2}, {nodeup, node(DownPid)}, State2), + + %% the consumer is still active and the same as before + ?assertMatch([{C2, #consumer{status = up}}], + maps:to_list(State3#?STATE.consumers)), + % the waiting consumers should be un-suspected + ?assertEqual(2, length(State3#?STATE.waiting_consumers)), + lists:foreach(fun({_, #consumer{status = Status}}) -> + ?assert(Status /= suspected_down) + end, State3#?STATE.waiting_consumers), + ok. + +single_active_consumer_all_disconnected_test(_) -> + R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => R, + release_cursor_interval => 0, + single_active_consumer_on => true}), + Meta = #{index => 1}, + Nodes = [n1, n2], + ConsumerIds = [C1 = {_, C1Pid}, C2 = {_, C2Pid}] = + [begin + B = atom_to_binary(N, utf8), + {<<"ctag_", B/binary>>, + test_util:fake_pid(N)} + end || N <- Nodes], + % adding some consumers + State1 = lists:foldl( + fun(CId, Acc0) -> + {Acc, _, _} = + apply(Meta, + make_checkout(CId, + {once, 1, simple_prefetch}, #{}), + Acc0), + Acc + end, State0, ConsumerIds), + + %% assert the consumer is up + ?assertMatch(#{C1 := #consumer{status = up}}, State1#?STATE.consumers), + + % simulate node goes down + {State2, _, _} = apply(meta(5), {down, C1Pid, noconnection}, State1), + %% assert the consumer fails over to the consumer on n2 + ?assertMatch(#{C2 := #consumer{status = up}}, State2#?STATE.consumers), + {State3, _, _} = apply(meta(6), {down, C2Pid, noconnection}, State2), + %% assert these no active consumer after both nodes are maked as down + ?assertMatch([], maps:to_list(State3#?STATE.consumers)), + %% n2 comes back + {State4, _, _} = apply(meta(7), {nodeup, node(C2Pid)}, State3), + %% ensure n2 is the active consumer as this node as been registered + %% as up again + ?assertMatch([{{<<"ctag_n2">>, _}, #consumer{status = up, + credit = 1}}], + maps:to_list(State4#?STATE.consumers)), + ok. + +single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => + rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + Meta = #{index => 1}, + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + Meta, + make_checkout({CTag, ChannelId}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + Effects = rabbit_fifo_v0:state_enter(leader, State1), + %% 2 effects for each consumer process (channel process), 1 effect for the node, + %% 1 effect for file handle reservation + ?assertEqual(2 * 3 + 1 + 1, length(Effects)). + +single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) -> + Resource = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => Resource, + release_cursor_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + Meta = #{index => 1}, + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + Meta, + make_checkout({CTag, ChannelId}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, + {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + Effects = rabbit_fifo_v0:state_enter(eol, State1), + %% 1 effect for each consumer process (channel process), + %% 1 effect for file handle reservation + ?assertEqual(4, length(Effects)). + +query_consumers_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => false}), + + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + #{index => 1}, + make_checkout({CTag, self()}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + Consumers0 = State1#?STATE.consumers, + Consumer = maps:get({<<"ctag2">>, self()}, Consumers0), + Consumers1 = maps:put({<<"ctag2">>, self()}, + Consumer#consumer{status = suspected_down}, Consumers0), + State2 = State1#?STATE{consumers = Consumers1}, + + ?assertEqual(4, rabbit_fifo_v0:query_consumer_count(State2)), + Consumers2 = rabbit_fifo_v0:query_consumers(State2), + ?assertEqual(4, maps:size(Consumers2)), + maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) -> + ?assertEqual(self(), Pid), + case Tag of + <<"ctag2">> -> + ?assertNot(Active), + ?assertEqual(suspected_down, ActivityStatus); + _ -> + ?assert(Active), + ?assertEqual(up, ActivityStatus) + end + end, [], Consumers2). + +query_consumers_when_single_active_consumer_is_on_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + Meta = #{index => 1}, + % adding some consumers + AddConsumer = fun(CTag, State) -> + {NewState, _, _} = apply( + Meta, + make_checkout({CTag, self()}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + + ?assertEqual(4, rabbit_fifo_v0:query_consumer_count(State1)), + Consumers = rabbit_fifo_v0:query_consumers(State1), + ?assertEqual(4, maps:size(Consumers)), + maps:fold(fun(_Key, {Pid, Tag, _, _, Active, ActivityStatus, _, _}, _Acc) -> + ?assertEqual(self(), Pid), + case Tag of + <<"ctag1">> -> + ?assert(Active), + ?assertEqual(single_active, ActivityStatus); + _ -> + ?assertNot(Active), + ?assertEqual(waiting, ActivityStatus) + end + end, [], Consumers). + +active_flag_updated_when_consumer_suspected_unsuspected_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => false}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = + apply( + #{index => 1}, + rabbit_fifo_v0:make_checkout({CTag, ChannelId}, + {once, 1, simple_prefetch}, + #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + {State2, _, Effects2} = apply(#{index => 3}, {down, Pid1, noconnection}, State1), + % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node + ?assertEqual(4 + 1, length(Effects2)), + + {_, _, Effects3} = apply(#{index => 4}, {nodeup, node(self())}, State2), + % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID + ?assertEqual(4 + 4, length(Effects3)). + +active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + DummyFunction = fun() -> ok end, + Pid1 = spawn(DummyFunction), + Pid2 = spawn(DummyFunction), + Pid3 = spawn(DummyFunction), + + % adding some consumers + AddConsumer = fun({CTag, ChannelId}, State) -> + {NewState, _, _} = apply( + #{index => 1}, + make_checkout({CTag, ChannelId}, + {once, 1, simple_prefetch}, #{}), + State), + NewState + end, + State1 = lists:foldl(AddConsumer, State0, + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, + {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + + {State2, _, Effects2} = apply(#{index => 2}, {down, Pid1, noconnection}, State1), + % one monitor and one consumer status update (deactivated) + ?assertEqual(3, length(Effects2)), + + {_, _, Effects3} = apply(#{index => 3}, {nodeup, node(self())}, State2), + % for each consumer: 1 effect to monitor the consumer PID + ?assertEqual(5, length(Effects3)). + +single_active_cancelled_with_unacked_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + C1 = {<<"ctag1">>, self()}, + C2 = {<<"ctag2">>, self()}, + % adding some consumers + AddConsumer = fun(C, S0) -> + {S, _, _} = apply( + meta(1), + make_checkout(C, + {auto, 1, simple_prefetch}, + #{}), + S0), + S + end, + State1 = lists:foldl(AddConsumer, State0, [C1, C2]), + + %% enqueue 2 messages + {State2, _Effects2} = enq(3, 1, msg1, State1), + {State3, _Effects3} = enq(4, 2, msg2, State2), + %% one should be checked ou to C1 + %% cancel C1 + {State4, _, _} = apply(meta(5), + make_checkout(C1, cancel, #{}), + State3), + %% C2 should be the active consumer + ?assertMatch(#{C2 := #consumer{status = up, + checked_out = #{0 := _}}}, + State4#?STATE.consumers), + %% C1 should be a cancelled consumer + ?assertMatch(#{C1 := #consumer{status = cancelled, + lifetime = once, + checked_out = #{0 := _}}}, + State4#?STATE.consumers), + ?assertMatch([], State4#?STATE.waiting_consumers), + + %% Ack both messages + {State5, _Effects5} = settle(C1, 1, 0, State4), + %% C1 should now be cancelled + {State6, _Effects6} = settle(C2, 2, 0, State5), + + %% C2 should remain + ?assertMatch(#{C2 := #consumer{status = up}}, + State6#?STATE.consumers), + %% C1 should be gone + ?assertNotMatch(#{C1 := _}, + State6#?STATE.consumers), + ?assertMatch([], State6#?STATE.waiting_consumers), + ok. + +single_active_with_credited_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + C1 = {<<"ctag1">>, self()}, + C2 = {<<"ctag2">>, self()}, + % adding some consumers + AddConsumer = fun(C, S0) -> + {S, _, _} = apply( + meta(1), + make_checkout(C, + {auto, 0, credited}, + #{}), + S0), + S + end, + State1 = lists:foldl(AddConsumer, State0, [C1, C2]), + + %% add some credit + C1Cred = rabbit_fifo_v0:make_credit(C1, 5, 0, false), + {State2, _, _Effects2} = apply(meta(3), C1Cred, State1), + C2Cred = rabbit_fifo_v0:make_credit(C2, 4, 0, false), + {State3, _} = apply(meta(4), C2Cred, State2), + %% both consumers should have credit + ?assertMatch(#{C1 := #consumer{credit = 5}}, + State3#?STATE.consumers), + ?assertMatch([{C2, #consumer{credit = 4}}], + State3#?STATE.waiting_consumers), + ok. + +purge_nodes_test(_) -> + Node = purged@node, + ThisNode = node(), + EnqPid = test_util:fake_pid(Node), + EnqPid2 = test_util:fake_pid(node()), + ConPid = test_util:fake_pid(Node), + Cid = {<<"tag">>, ConPid}, + % WaitingPid = test_util:fake_pid(Node), + + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + single_active_consumer_on => false}), + {State1, _, _} = apply(meta(1), + rabbit_fifo_v0:make_enqueue(EnqPid, 1, msg1), + State0), + {State2, _, _} = apply(meta(2), + rabbit_fifo_v0:make_enqueue(EnqPid2, 1, msg2), + State1), + {State3, _} = check(Cid, 3, 1000, State2), + {State4, _, _} = apply(meta(4), + {down, EnqPid, noconnection}, + State3), + ?assertMatch( + [{mod_call, rabbit_quorum_queue, handle_tick, + [#resource{}, _Metrics, + [ThisNode, Node] + ]}] , rabbit_fifo_v0:tick(1, State4)), + %% assert there are both enqueuers and consumers + {State, _, _} = apply(meta(5), + rabbit_fifo_v0:make_purge_nodes([Node]), + State4), + + %% assert there are no enqueuers nor consumers + ?assertMatch(#?STATE{enqueuers = Enqs} when map_size(Enqs) == 1, State), + ?assertMatch(#?STATE{consumers = Cons} when map_size(Cons) == 0, State), + ?assertMatch( + [{mod_call, rabbit_quorum_queue, handle_tick, + [#resource{}, _Metrics, + [ThisNode] + ]}] , rabbit_fifo_v0:tick(1, State)), + ok. + +meta(Idx) -> + #{index => Idx, term => 1, + from => {make_ref(), self()}}. + +enq(Idx, MsgSeq, Msg, State) -> + strip_reply( + apply(meta(Idx), rabbit_fifo_v0:make_enqueue(self(), MsgSeq, Msg), State)). + +deq(Idx, Cid, Settlement, State0) -> + {State, {dequeue, {MsgId, Msg}, _}, _} = + apply(meta(Idx), + rabbit_fifo_v0:make_checkout(Cid, {dequeue, Settlement}, #{}), + State0), + {State, {MsgId, Msg}}. + +check_n(Cid, Idx, N, State) -> + strip_reply( + apply(meta(Idx), + rabbit_fifo_v0:make_checkout(Cid, {auto, N, simple_prefetch}, #{}), + State)). + +check(Cid, Idx, State) -> + strip_reply( + apply(meta(Idx), + rabbit_fifo_v0:make_checkout(Cid, {once, 1, simple_prefetch}, #{}), + State)). + +check_auto(Cid, Idx, State) -> + strip_reply( + apply(meta(Idx), + rabbit_fifo_v0:make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), + State)). + +check(Cid, Idx, Num, State) -> + strip_reply( + apply(meta(Idx), + rabbit_fifo_v0:make_checkout(Cid, {auto, Num, simple_prefetch}, #{}), + State)). + +settle(Cid, Idx, MsgId, State) -> + strip_reply(apply(meta(Idx), rabbit_fifo_v0:make_settle(Cid, [MsgId]), State)). + +credit(Cid, Idx, Credit, DelCnt, Drain, State) -> + strip_reply(apply(meta(Idx), rabbit_fifo_v0:make_credit(Cid, Credit, DelCnt, Drain), + State)). + +strip_reply({State, _, Effects}) -> + {State, Effects}. + +run_log(InitState, Entries) -> + lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) -> + case apply(meta(Idx), E, Acc0) of + {Acc, _, Efx} when is_list(Efx) -> + {Acc, Efx0 ++ Efx}; + {Acc, _, Efx} -> + {Acc, Efx0 ++ [Efx]}; + {Acc, _} -> + {Acc, Efx0} + end + end, {InitState, []}, Entries). + + +%% AUX Tests + +aux_test(_) -> + _ = ra_machine_ets:start_link(), + Aux0 = init_aux(aux_test), + MacState = init(#{name => aux_test, + queue_resource => + rabbit_misc:r(<<"/">>, queue, <<"test">>)}), + ok = meck:new(ra_log, []), + Log = mock_log, + meck:expect(ra_log, last_index_term, fun (_) -> {0, 0} end), + {no_reply, Aux, mock_log} = handle_aux(leader, cast, active, Aux0, + Log, MacState), + {no_reply, _Aux, mock_log} = handle_aux(leader, cast, tick, Aux, + Log, MacState), + [X] = ets:lookup(rabbit_fifo_usage, aux_test), + meck:unload(), + ?assert(X > 0.0), + ok. + +%% Utility + +init(Conf) -> rabbit_fifo_v0:init(Conf). +apply(Meta, Entry, State) -> rabbit_fifo_v0:apply(Meta, Entry, State). +init_aux(Conf) -> rabbit_fifo_v0:init_aux(Conf). +handle_aux(S, T, C, A, L, M) -> rabbit_fifo_v0:handle_aux(S, T, C, A, L, M). +make_checkout(C, S, M) -> rabbit_fifo_v0:make_checkout(C, S, M). |