summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-10-11 22:40:22 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-10-11 22:40:22 +0100
commitb7f51c4a544aa5f2ebf4c5618023fc3933232cb6 (patch)
tree822be320f99b94f300cab126e294ef15ccc01a39
parent3581b0dfcf1a7cf78f5a1b27486644ddbd2212a2 (diff)
downloadrabbitmq-server-b7f51c4a544aa5f2ebf4c5618023fc3933232cb6.tar.gz
make tests wait for *exactly* the right confirms, no more, no fewer
This does introduce a 100ms pause for every invocation; hopefully enough for the msg_store to catch up and sync (the sync interval is 25ms) the usually small numbers of messages. The confirm capture code now uses lists rather than gb_sets, as a convenient 'bag' data structure. This can have O(n^2) perf, but we only need to start worrying about that when the tests call this code with thousands of msg ids, which currently they don't.
-rw-r--r--src/rabbit_tests.erl37
1 files changed, 22 insertions, 15 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d5dafd64..b8f3694d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1837,27 +1837,34 @@ msg_store_client_init(MsgStore, Ref) ->
rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined).
on_disk_capture() ->
- on_disk_capture({gb_sets:new(), gb_sets:new(), undefined}).
-on_disk_capture({OnDisk, Awaiting, Pid}) ->
- Pid1 = case Pid =/= undefined andalso gb_sets:is_empty(Awaiting) of
- true -> Pid ! {self(), arrived}, undefined;
- false -> Pid
- end,
receive
- {await, MsgIds, Pid2} ->
- true = Pid1 =:= undefined andalso gb_sets:is_empty(Awaiting),
- on_disk_capture({OnDisk, gb_sets:subtract(MsgIds, OnDisk), Pid2});
- {on_disk, MsgIds} ->
- on_disk_capture({gb_sets:union(OnDisk, MsgIds),
- gb_sets:subtract(Awaiting, MsgIds),
- Pid1});
+ {await, MsgIds, Pid} -> on_disk_capture([], MsgIds, Pid);
+ stop -> done
+ end.
+
+on_disk_capture(OnDisk, Awaiting, Pid) ->
+ receive
+ {on_disk, MsgIdsS} ->
+ MsgIds = gb_sets:to_list(MsgIdsS),
+ on_disk_capture(OnDisk ++ (MsgIds -- Awaiting), Awaiting -- MsgIds,
+ Pid);
stop ->
done
+ after 100 ->
+ case {OnDisk, Awaiting} of
+ {[], []} -> Pid ! {self(), arrived}, on_disk_capture();
+ {_, []} -> Pid ! {self(), surplus};
+ {[], _} -> Pid ! {self(), timeout};
+ {_, _} -> Pid ! {self(), surplus_timeout}
+ end
end.
on_disk_await(Pid, MsgIds) when is_list(MsgIds) ->
- Pid ! {await, gb_sets:from_list(MsgIds), self()},
- receive {Pid, arrived} -> ok end.
+ Pid ! {await, MsgIds, self()},
+ receive
+ {Pid, arrived} -> ok;
+ {Pid, Error} -> Error
+ end.
on_disk_stop(Pid) ->
MRef = erlang:monitor(process, Pid),