summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-08-15 17:02:24 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-08-15 17:02:24 +0100
commitd7e03b29549f450f391e5a4aaab287f6cd871a48 (patch)
tree4807a00fd4f064a8372094ba26085658dec25c25
parenta014b2e567576557475eab2b3916a7b1d65ef59f (diff)
downloadrabbitmq-server-d7e03b29549f450f391e5a4aaab287f6cd871a48.tar.gz
Well it doesn't work, but it should eventually...
-rw-r--r--src/rabbit_tests.erl98
1 files changed, 61 insertions, 37 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ed4efb47..10fdd75c 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1767,24 +1767,69 @@ msg_id_bin(X) ->
msg_store_client_init(MsgStore, Ref) ->
rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined).
+on_disk_capture() ->
+ on_disk_capture({gb_sets:new(), gb_sets:new(), undefined}).
+on_disk_capture({OnDisk, Awaiting, Pid}) ->
+ receive
+ {on_disk, MsgIds} when Awaiting =/= undefined ->
+ Awaiting1 = gb_sets:subtract(Awaiting, MsgIds),
+ OnDisk1 = gb_sets:subtract(gb_sets:union(OnDisk, MsgIds), Awaiting),
+ case (not gb_sets:is_empty(Awaiting))
+ andalso gb_sets:is_empty(Awaiting1) of
+ true -> Pid ! {self(), arrived},
+ on_disk_capture({OnDisk1, undefined, undefined});
+ false -> on_disk_capture({OnDisk1, Awaiting1, Pid})
+ end;
+ {on_disk, MsgIds} ->
+ on_disk_capture({gb_sets:union(OnDisk, MsgIds), Awaiting, Pid});
+ {await, MsgIds, Pid} when Awaiting =/= undefined ->
+ OnDisk1 = gb_sets:subtract(OnDisk, MsgIds),
+ Awaiting1 = gb_sets:subtract(MsgIds, OnDisk),
+ case gb_sets:is_empty(Awaiting1) of
+ true -> Pid ! {self(), arrived},
+ on_disk_capture({OnDisk1, undefined, undefined});
+ false -> on_disk_capture({OnDisk1, Awaiting1, Pid})
+ end;
+ stop ->
+ done
+ end.
+
+on_disk_await(Pid, MsgIds) ->
+ Pid ! {await, MsgIds, self()},
+ receive {Pid, arrived} -> ok end.
+
+on_disk_stop(Pid) ->
+ MRef = erlang:monitor(process, Pid),
+ Pid ! stop,
+ receive {'DOWN', MRef, process, Pid, _Reason} ->
+ ok
+ end.
+
+msg_store_client_init_capture(MsgStore, Ref) ->
+ Pid = spawn(fun on_disk_capture/0),
+ {Pid, rabbit_msg_store:client_init(
+ MsgStore, Ref, fun (MsgIds, _ActionTaken) ->
+ Pid ! {on_disk, MsgIds}
+ end, undefined)}.
+
msg_store_contains(Atom, MsgIds, MSCState) ->
Atom = lists:foldl(
fun (MsgId, Atom1) when Atom1 =:= Atom ->
rabbit_msg_store:contains(MsgId, MSCState) end,
Atom, MsgIds).
-msg_store_sync(MsgIds, MSCState) ->
- Ref = make_ref(),
- Self = self(),
- ok = rabbit_msg_store:sync(MsgIds, fun () -> Self ! {sync, Ref} end,
- MSCState),
- receive
- {sync, Ref} -> ok
- after
- 10000 ->
- io:format("Sync from msg_store missing for msg_ids ~p~n", [MsgIds]),
- throw(timeout)
- end.
+%% msg_store_sync(MsgIds, MSCState) ->
+%% Ref = make_ref(),
+%% Self = self(),
+%% ok = rabbit_msg_store:sync(MsgIds, fun () -> Self ! {sync, Ref} end,
+%% MSCState),
+%% receive
+%% {sync, Ref} -> ok
+%% after
+%% 10000 ->
+%% io:format("Sync from msg_store missing for msg_ids ~p~n", [MsgIds]),
+%% throw(timeout)
+%% end.
msg_store_read(MsgIds, MSCState) ->
lists:foldl(fun (MsgId, MSCStateM) ->
@@ -1819,22 +1864,18 @@ foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
test_msg_store() ->
restart_msg_store_empty(),
- Self = self(),
MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)],
{MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds),
Ref = rabbit_guid:guid(),
- MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
+ {Cap, MSCState} = msg_store_client_init_capture(?PERSISTENT_MSG_STORE, Ref),
%% check we don't contain any of the msgs we're about to publish
false = msg_store_contains(false, MsgIds, MSCState),
%% publish the first half
ok = msg_store_write(MsgIds1stHalf, MSCState),
%% sync on the first half
- ok = msg_store_sync(MsgIds1stHalf, MSCState),
+ ok = on_disk_await(Cap, MsgIds1stHalf),
%% publish the second half
ok = msg_store_write(MsgIds2ndHalf, MSCState),
- %% sync on the first half again - the msg_store will be dirty, but
- %% we won't need the fsync
- ok = msg_store_sync(MsgIds1stHalf, MSCState),
%% check they're all in there
true = msg_store_contains(true, MsgIds, MSCState),
%% publish the latter half twice so we hit the caching and ref count code
@@ -1843,25 +1884,8 @@ test_msg_store() ->
true = msg_store_contains(true, MsgIds, MSCState),
%% sync on the 2nd half, but do lots of individual syncs to try
%% and cause coalescing to happen
- ok = lists:foldl(
- fun (MsgId, ok) -> rabbit_msg_store:sync(
- [MsgId], fun () -> Self ! {sync, MsgId} end,
- MSCState)
- end, ok, MsgIds2ndHalf),
- lists:foldl(
- fun(MsgId, ok) ->
- receive
- {sync, MsgId} -> ok
- after
- 10000 ->
- io:format("Sync from msg_store missing (msg_id: ~p)~n",
- [MsgId]),
- throw(timeout)
- end
- end, ok, MsgIds2ndHalf),
- %% it's very likely we're not dirty here, so the 1st half sync
- %% should hit a different code path
- ok = msg_store_sync(MsgIds1stHalf, MSCState),
+ ok = on_disk_await(Cap, MsgIds2ndHalf),
+ ok = on_disk_stop(Cap),
%% read them all
MSCState1 = msg_store_read(MsgIds, MSCState),
%% read them all again - this will hit the cache, not disk