summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-10-12 15:17:52 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-10-12 15:17:52 +0100
commite680875be3709a7b90f6f278e8308097e59ed268 (patch)
tree6411324045efed9fc764227efdeb2ad2d0dc8987
parent3581b0dfcf1a7cf78f5a1b27486644ddbd2212a2 (diff)
parenta483e87b2fbf3fbb000b3fb6097d93dc616ce012 (diff)
downloadrabbitmq-server-e680875be3709a7b90f6f278e8308097e59ed268.tar.gz
Merge bug24477
-rw-r--r--src/rabbit_msg_store.erl5
-rw-r--r--src/rabbit_tests.erl68
2 files changed, 56 insertions, 17 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index fc3cbebd..e4691b81 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1103,9 +1103,10 @@ record_pending_confirm(CRef, MsgId, State) ->
client_confirm(CRef, MsgIds, ActionTaken, State) ->
update_pending_confirms(
fun (MsgOnDiskFun, CTM) ->
- MsgOnDiskFun(MsgIds, ActionTaken),
case dict:find(CRef, CTM) of
- {ok, Gs} -> MsgIds1 = gb_sets:difference(Gs, MsgIds),
+ {ok, Gs} -> MsgOnDiskFun(gb_sets:intersection(Gs, MsgIds),
+ ActionTaken),
+ MsgIds1 = gb_sets:difference(Gs, MsgIds),
case gb_sets:is_empty(MsgIds1) of
true -> dict:erase(CRef, CTM);
false -> dict:store(CRef, MsgIds1, CTM)
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d5dafd64..5e034ae7 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1837,27 +1837,34 @@ msg_store_client_init(MsgStore, Ref) ->
rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined).
on_disk_capture() ->
- on_disk_capture({gb_sets:new(), gb_sets:new(), undefined}).
-on_disk_capture({OnDisk, Awaiting, Pid}) ->
- Pid1 = case Pid =/= undefined andalso gb_sets:is_empty(Awaiting) of
- true -> Pid ! {self(), arrived}, undefined;
- false -> Pid
- end,
receive
- {await, MsgIds, Pid2} ->
- true = Pid1 =:= undefined andalso gb_sets:is_empty(Awaiting),
- on_disk_capture({OnDisk, gb_sets:subtract(MsgIds, OnDisk), Pid2});
- {on_disk, MsgIds} ->
- on_disk_capture({gb_sets:union(OnDisk, MsgIds),
- gb_sets:subtract(Awaiting, MsgIds),
- Pid1});
+ {await, MsgIds, Pid} -> on_disk_capture([], MsgIds, Pid);
+ stop -> done
+ end.
+
+on_disk_capture(OnDisk, Awaiting, Pid) ->
+ receive
+ {on_disk, MsgIdsS} ->
+ MsgIds = gb_sets:to_list(MsgIdsS),
+ on_disk_capture(OnDisk ++ (MsgIds -- Awaiting), Awaiting -- MsgIds,
+ Pid);
stop ->
done
+ after 100 ->
+ case {OnDisk, Awaiting} of
+ {[], []} -> Pid ! {self(), arrived}, on_disk_capture();
+ {_, []} -> Pid ! {self(), surplus};
+ {[], _} -> Pid ! {self(), timeout};
+ {_, _} -> Pid ! {self(), surplus_timeout}
+ end
end.
on_disk_await(Pid, MsgIds) when is_list(MsgIds) ->
- Pid ! {await, gb_sets:from_list(MsgIds), self()},
- receive {Pid, arrived} -> ok end.
+ Pid ! {await, MsgIds, self()},
+ receive
+ {Pid, arrived} -> ok;
+ {Pid, Error} -> Error
+ end.
on_disk_stop(Pid) ->
MRef = erlang:monitor(process, Pid),
@@ -1922,6 +1929,8 @@ test_msg_store() ->
?PERSISTENT_MSG_STORE, Ref2),
%% check we don't contain any of the msgs we're about to publish
false = msg_store_contains(false, MsgIds, MSCState),
+ %% test confirm logic
+ passed = test_msg_store_confirms([hd(MsgIds)], Cap, MSCState),
%% publish the first half
ok = msg_store_write(MsgIds1stHalf, MSCState),
%% sync on the first half
@@ -2033,6 +2042,35 @@ test_msg_store() ->
restart_msg_store_empty(),
passed.
+%% We want to test that writes that get eliminated due to removes still
+%% get confirmed. Removes themselves do not.
+test_msg_store_confirms(MsgIds, Cap, MSCState) ->
+ %% write -> confirmed
+ ok = msg_store_write(MsgIds, MSCState),
+ ok = on_disk_await(Cap, MsgIds),
+ %% remove -> _
+ ok = msg_store_remove(MsgIds, MSCState),
+ ok = on_disk_await(Cap, []),
+ %% write, remove -> confirmed
+ ok = msg_store_write(MsgIds, MSCState),
+ ok = msg_store_remove(MsgIds, MSCState),
+ ok = on_disk_await(Cap, MsgIds),
+ %% write, remove, write -> confirmed, confirmed
+ ok = msg_store_write(MsgIds, MSCState),
+ ok = msg_store_remove(MsgIds, MSCState),
+ ok = msg_store_write(MsgIds, MSCState),
+ ok = on_disk_await(Cap, MsgIds ++ MsgIds),
+ %% remove, write -> confirmed
+ ok = msg_store_remove(MsgIds, MSCState),
+ ok = msg_store_write(MsgIds, MSCState),
+ ok = on_disk_await(Cap, MsgIds),
+ %% remove, write, remove -> confirmed
+ ok = msg_store_remove(MsgIds, MSCState),
+ ok = msg_store_write(MsgIds, MSCState),
+ ok = msg_store_remove(MsgIds, MSCState),
+ ok = on_disk_await(Cap, MsgIds),
+ passed.
+
queue_name(Name) ->
rabbit_misc:r(<<"/">>, queue, Name).