diff options
author | kjnilsson <knilsson@pivotal.io> | 2020-03-02 11:01:23 +0000 |
---|---|---|
committer | kjnilsson <knilsson@pivotal.io> | 2020-03-02 11:01:23 +0000 |
commit | 9ab6c49f78683d076cad2e30b379f56431a55ce1 (patch) | |
tree | 753ce76580e7e3bf8afd865379556eea4c9b73d8 | |
parent | f008fb3d463e07fbd649f316e98fcf08e4904168 (diff) | |
download | rabbitmq-server-git-9ab6c49f78683d076cad2e30b379f56431a55ce1.tar.gz |
rabbit_fifo: release cursor effect ordering
Ensure release cursors are added to the end of the effects list rather
than the front as there may be effects such as log effects that require
log entries that may be truncated by the release cursor effect to be
present.
-rw-r--r-- | src/rabbit_fifo.erl | 9 | ||||
-rw-r--r-- | test/quorum_queue_SUITE.erl | 4 | ||||
-rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 5 |
3 files changed, 9 insertions, 9 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 762963419c..d0b56961ec 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -1103,7 +1103,7 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned, {Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects1), State2 = State1#?MODULE{consumers = Cons, - service_queue = SQ}, + service_queue = SQ}, {State, ok, Effects} = checkout(Meta, State2, Effects2), update_smallest_raft_index(IncomingRaftIdx, State, Effects). @@ -1153,7 +1153,6 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, {State2, Effects1} = complete(ConsumerId, Discarded, Con0, Effects0, State0), {State, ok, Effects} = checkout(Meta, State2, Effects1), - % settle metrics are incremented separately update_smallest_raft_index(IncomingRaftIdx, State, Effects). dead_letter_effects(_Reason, _Discarded, @@ -1199,8 +1198,7 @@ update_smallest_raft_index(IncomingRaftIdx, % we can forward release_cursor all the way until % the last received command, hooray State = State0#?MODULE{release_cursors = lqueue:new()}, - {State, ok, - [{release_cursor, IncomingRaftIdx, State} | Effects]}; + {State, ok, Effects ++ [{release_cursor, IncomingRaftIdx, State}]}; _ -> Smallest = rabbit_fifo_index:smallest(Indexes), case find_next_cursor(Smallest, Cursors0) of @@ -1211,7 +1209,7 @@ update_smallest_raft_index(IncomingRaftIdx, %% we can emit a release cursor we've passed the smallest %% release cursor available. {State0#?MODULE{release_cursors = Cursors}, ok, - [Cursor | Effects]} + Effects ++ [Cursor]} end end. @@ -1315,7 +1313,6 @@ return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId, end, {State, Effects0}, Checked). %% checkout new messages to consumers -%% reverses the effects list checkout(#{index := Index}, State0, Effects0) -> {State1, _Result, Effects1} = checkout0(checkout_one(State0), Effects0, {#{}, #{}}), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 0fd76dd0fb..c340c76dda 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -1431,7 +1431,7 @@ delete_member_not_a_member(Config) -> [<<"/">>, QQ, Server])). delete_member_during_node_down(Config) -> - [Server, DownServer, _] = rabbit_ct_broker_helpers:get_node_configs( + [Server, DownServer, Remove] = rabbit_ct_broker_helpers:get_node_configs( Config, nodename), stop_node(Config, DownServer), @@ -1441,7 +1441,7 @@ delete_member_during_node_down(Config) -> declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), timer:sleep(200), ?assertEqual(ok, rpc:call(Server, rabbit_quorum_queue, delete_member, - [<<"/">>, QQ, Server])), + [<<"/">>, QQ, Remove])), rabbit_ct_broker_helpers:start_node(Config, DownServer), ?assertEqual(ok, rpc:call(Server, rabbit_quorum_queue, repair_amqqueue_nodes, diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index d79dac5b6f..0cd502c403 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -605,7 +605,8 @@ in_memory_limit(_Config) -> Size = 2000, run_proper( fun () -> - ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength, InMemoryBytes}, + ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, + InMemoryLength, InMemoryBytes}, frequency([{10, {0, 0, false, 0, 0, 0}}, {5, {oneof([range(1, 10), undefined]), oneof([range(1, 1000), undefined]), @@ -665,6 +666,8 @@ in_memory_limit_prop(Conf0, Commands) -> false end. +validate_idx_order([], _ReleaseCursorIdx) -> + true; validate_idx_order(Idxs, ReleaseCursorIdx) -> Min = lists:min(Idxs), case Min < ReleaseCursorIdx of |