summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-03-02 11:01:23 +0000
committerkjnilsson <knilsson@pivotal.io>2020-03-02 11:01:23 +0000
commit9ab6c49f78683d076cad2e30b379f56431a55ce1 (patch)
tree753ce76580e7e3bf8afd865379556eea4c9b73d8
parentf008fb3d463e07fbd649f316e98fcf08e4904168 (diff)
downloadrabbitmq-server-git-9ab6c49f78683d076cad2e30b379f56431a55ce1.tar.gz
rabbit_fifo: release cursor effect ordering
Ensure release cursors are added to the end of the effects list rather than the front as there may be effects such as log effects that require log entries that may be truncated by the release cursor effect to be present.
-rw-r--r--src/rabbit_fifo.erl9
-rw-r--r--test/quorum_queue_SUITE.erl4
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl5
3 files changed, 9 insertions, 9 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 762963419c..d0b56961ec 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -1103,7 +1103,7 @@ return(#{index := IncomingRaftIdx} = Meta, ConsumerId, Returned,
{Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects1),
State2 = State1#?MODULE{consumers = Cons,
- service_queue = SQ},
+ service_queue = SQ},
{State, ok, Effects} = checkout(Meta, State2, Effects2),
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
@@ -1153,7 +1153,6 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
{State2, Effects1} = complete(ConsumerId, Discarded, Con0,
Effects0, State0),
{State, ok, Effects} = checkout(Meta, State2, Effects1),
- % settle metrics are incremented separately
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
dead_letter_effects(_Reason, _Discarded,
@@ -1199,8 +1198,7 @@ update_smallest_raft_index(IncomingRaftIdx,
% we can forward release_cursor all the way until
% the last received command, hooray
State = State0#?MODULE{release_cursors = lqueue:new()},
- {State, ok,
- [{release_cursor, IncomingRaftIdx, State} | Effects]};
+ {State, ok, Effects ++ [{release_cursor, IncomingRaftIdx, State}]};
_ ->
Smallest = rabbit_fifo_index:smallest(Indexes),
case find_next_cursor(Smallest, Cursors0) of
@@ -1211,7 +1209,7 @@ update_smallest_raft_index(IncomingRaftIdx,
%% we can emit a release cursor we've passed the smallest
%% release cursor available.
{State0#?MODULE{release_cursors = Cursors}, ok,
- [Cursor | Effects]}
+ Effects ++ [Cursor]}
end
end.
@@ -1315,7 +1313,6 @@ return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
end, {State, Effects0}, Checked).
%% checkout new messages to consumers
-%% reverses the effects list
checkout(#{index := Index}, State0, Effects0) ->
{State1, _Result, Effects1} = checkout0(checkout_one(State0),
Effects0, {#{}, #{}}),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 0fd76dd0fb..c340c76dda 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -1431,7 +1431,7 @@ delete_member_not_a_member(Config) ->
[<<"/">>, QQ, Server])).
delete_member_during_node_down(Config) ->
- [Server, DownServer, _] = rabbit_ct_broker_helpers:get_node_configs(
+ [Server, DownServer, Remove] = rabbit_ct_broker_helpers:get_node_configs(
Config, nodename),
stop_node(Config, DownServer),
@@ -1441,7 +1441,7 @@ delete_member_during_node_down(Config) ->
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
timer:sleep(200),
?assertEqual(ok, rpc:call(Server, rabbit_quorum_queue, delete_member,
- [<<"/">>, QQ, Server])),
+ [<<"/">>, QQ, Remove])),
rabbit_ct_broker_helpers:start_node(Config, DownServer),
?assertEqual(ok, rpc:call(Server, rabbit_quorum_queue, repair_amqqueue_nodes,
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index d79dac5b6f..0cd502c403 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -605,7 +605,8 @@ in_memory_limit(_Config) ->
Size = 2000,
run_proper(
fun () ->
- ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength, InMemoryBytes},
+ ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit,
+ InMemoryLength, InMemoryBytes},
frequency([{10, {0, 0, false, 0, 0, 0}},
{5, {oneof([range(1, 10), undefined]),
oneof([range(1, 1000), undefined]),
@@ -665,6 +666,8 @@ in_memory_limit_prop(Conf0, Commands) ->
false
end.
+validate_idx_order([], _ReleaseCursorIdx) ->
+ true;
validate_idx_order(Idxs, ReleaseCursorIdx) ->
Min = lists:min(Idxs),
case Min < ReleaseCursorIdx of