diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-08-15 17:02:24 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-08-15 17:02:24 +0100 |
commit | d7e03b29549f450f391e5a4aaab287f6cd871a48 (patch) | |
tree | 4807a00fd4f064a8372094ba26085658dec25c25 | |
parent | a014b2e567576557475eab2b3916a7b1d65ef59f (diff) | |
download | rabbitmq-server-d7e03b29549f450f391e5a4aaab287f6cd871a48.tar.gz |
Well it doesn't work, but it should eventually...
-rw-r--r-- | src/rabbit_tests.erl | 98 |
1 files changed, 61 insertions, 37 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ed4efb47..10fdd75c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1767,24 +1767,69 @@ msg_id_bin(X) -> msg_store_client_init(MsgStore, Ref) -> rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined). +on_disk_capture() -> + on_disk_capture({gb_sets:new(), gb_sets:new(), undefined}). +on_disk_capture({OnDisk, Awaiting, Pid}) -> + receive + {on_disk, MsgIds} when Awaiting =/= undefined -> + Awaiting1 = gb_sets:subtract(Awaiting, MsgIds), + OnDisk1 = gb_sets:subtract(gb_sets:union(OnDisk, MsgIds), Awaiting), + case (not gb_sets:is_empty(Awaiting)) + andalso gb_sets:is_empty(Awaiting1) of + true -> Pid ! {self(), arrived}, + on_disk_capture({OnDisk1, undefined, undefined}); + false -> on_disk_capture({OnDisk1, Awaiting1, Pid}) + end; + {on_disk, MsgIds} -> + on_disk_capture({gb_sets:union(OnDisk, MsgIds), Awaiting, Pid}); + {await, MsgIds, Pid} when Awaiting =/= undefined -> + OnDisk1 = gb_sets:subtract(OnDisk, MsgIds), + Awaiting1 = gb_sets:subtract(MsgIds, OnDisk), + case gb_sets:is_empty(Awaiting1) of + true -> Pid ! {self(), arrived}, + on_disk_capture({OnDisk1, undefined, undefined}); + false -> on_disk_capture({OnDisk1, Awaiting1, Pid}) + end; + stop -> + done + end. + +on_disk_await(Pid, MsgIds) -> + Pid ! {await, MsgIds, self()}, + receive {Pid, arrived} -> ok end. + +on_disk_stop(Pid) -> + MRef = erlang:monitor(process, Pid), + Pid ! stop, + receive {'DOWN', MRef, process, Pid, _Reason} -> + ok + end. + +msg_store_client_init_capture(MsgStore, Ref) -> + Pid = spawn(fun on_disk_capture/0), + {Pid, rabbit_msg_store:client_init( + MsgStore, Ref, fun (MsgIds, _ActionTaken) -> + Pid ! {on_disk, MsgIds} + end, undefined)}. + msg_store_contains(Atom, MsgIds, MSCState) -> Atom = lists:foldl( fun (MsgId, Atom1) when Atom1 =:= Atom -> rabbit_msg_store:contains(MsgId, MSCState) end, Atom, MsgIds). -msg_store_sync(MsgIds, MSCState) -> - Ref = make_ref(), - Self = self(), - ok = rabbit_msg_store:sync(MsgIds, fun () -> Self ! {sync, Ref} end, - MSCState), - receive - {sync, Ref} -> ok - after - 10000 -> - io:format("Sync from msg_store missing for msg_ids ~p~n", [MsgIds]), - throw(timeout) - end. +%% msg_store_sync(MsgIds, MSCState) -> +%% Ref = make_ref(), +%% Self = self(), +%% ok = rabbit_msg_store:sync(MsgIds, fun () -> Self ! {sync, Ref} end, +%% MSCState), +%% receive +%% {sync, Ref} -> ok +%% after +%% 10000 -> +%% io:format("Sync from msg_store missing for msg_ids ~p~n", [MsgIds]), +%% throw(timeout) +%% end. msg_store_read(MsgIds, MSCState) -> lists:foldl(fun (MsgId, MSCStateM) -> @@ -1819,22 +1864,18 @@ foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> test_msg_store() -> restart_msg_store_empty(), - Self = self(), MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)], {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds), Ref = rabbit_guid:guid(), - MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), + {Cap, MSCState} = msg_store_client_init_capture(?PERSISTENT_MSG_STORE, Ref), %% check we don't contain any of the msgs we're about to publish false = msg_store_contains(false, MsgIds, MSCState), %% publish the first half ok = msg_store_write(MsgIds1stHalf, MSCState), %% sync on the first half - ok = msg_store_sync(MsgIds1stHalf, MSCState), + ok = on_disk_await(Cap, MsgIds1stHalf), %% publish the second half ok = msg_store_write(MsgIds2ndHalf, MSCState), - %% sync on the first half again - the msg_store will be dirty, but - %% we won't need the fsync - ok = msg_store_sync(MsgIds1stHalf, MSCState), %% check they're all in there true = msg_store_contains(true, MsgIds, MSCState), %% publish the latter half twice so we hit the caching and ref count code @@ -1843,25 +1884,8 @@ test_msg_store() -> true = msg_store_contains(true, MsgIds, MSCState), %% sync on the 2nd half, but do lots of individual syncs to try %% and cause coalescing to happen - ok = lists:foldl( - fun (MsgId, ok) -> rabbit_msg_store:sync( - [MsgId], fun () -> Self ! {sync, MsgId} end, - MSCState) - end, ok, MsgIds2ndHalf), - lists:foldl( - fun(MsgId, ok) -> - receive - {sync, MsgId} -> ok - after - 10000 -> - io:format("Sync from msg_store missing (msg_id: ~p)~n", - [MsgId]), - throw(timeout) - end - end, ok, MsgIds2ndHalf), - %% it's very likely we're not dirty here, so the 1st half sync - %% should hit a different code path - ok = msg_store_sync(MsgIds1stHalf, MSCState), + ok = on_disk_await(Cap, MsgIds2ndHalf), + ok = on_disk_stop(Cap), %% read them all MSCState1 = msg_store_read(MsgIds, MSCState), %% read them all again - this will hit the cache, not disk |