summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-12-20 17:22:43 +0000
committerMatthew Sackman <matthew@lshift.net>2009-12-20 17:22:43 +0000
commit9c43ce8583aba0344a02187573edb3958409499a (patch)
tree54d9fe9d64ea6befe46051e0f7b3de145ca1f242
parent69d1c25e9139d2e8e7121a72af73a43ad1c90726 (diff)
downloadrabbitmq-server-bug22161.tar.gz
Ensure that :contains calls can't overtake :remove casts. Also modify the tests slightly.bug22161
-rw-r--r--src/rabbit_msg_store.erl66
-rw-r--r--src/rabbit_tests.erl25
2 files changed, 55 insertions, 36 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index f40c6270..c060c8d4 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -87,7 +87,7 @@
sum_valid_data, %% sum of valid data in all files
sum_file_size, %% sum of file sizes
pending_gc_completion, %% things to do once GC completes
- gc_running %% is the GC currently working?
+ gc_active %% is the GC currently working?
}).
-include("rabbit_msg_store.hrl").
@@ -266,7 +266,7 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
sum_valid_data = 0,
sum_file_size = 0,
pending_gc_completion = [],
- gc_running = false
+ gc_active = false
},
ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
@@ -295,17 +295,12 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call({read, MsgId}, From, State) ->
- case read_message(MsgId, State) of
- {ok, Msg, State1} -> reply({ok, Msg}, State1);
- {blocked, State1} -> noreply(add_to_pending_gc_completion(
- {read, MsgId, From}, State1))
- end;
+ State1 = read_message(MsgId, From, State),
+ noreply(State1);
-handle_call({contains, MsgId}, _From, State) ->
- reply(case index_lookup(MsgId, State) of
- not_found -> false;
- #msg_location {} -> true
- end, State).
+handle_call({contains, MsgId}, From, State) ->
+ State1 = contains_message(MsgId, From, State),
+ noreply(State1).
handle_cast({write, MsgId, Msg},
State = #msstate { current_file_handle = CurHdl,
@@ -383,7 +378,7 @@ handle_cast(sync, State) ->
handle_cast({gc_done, Reclaimed, Source, Dest},
State = #msstate { sum_file_size = SumFileSize,
- gc_running = true,
+ gc_active = {Source, Dest},
file_summary = FileSummary }) ->
%% we always move data left, so Source has gone and was on the
%% right, so need to make dest = source.right.left, and also
@@ -399,7 +394,7 @@ handle_cast({gc_done, Reclaimed, Source, Dest},
true = ets:delete(FileSummary, Source),
noreply(run_pending(
State #msstate { sum_file_size = SumFileSize - Reclaimed,
- gc_running = false })).
+ gc_active = false })).
handle_info(timeout, State) ->
noreply(sync(State));
@@ -485,12 +480,13 @@ sync(State = #msstate { current_file_handle = CurHdl,
State1 #msstate { on_sync = [] }
end.
-read_message(MsgId, State =
+read_message(MsgId, From, State =
#msstate { current_file = CurFile,
current_file_handle = CurHdl,
file_summary = FileSummary }) ->
case index_lookup(MsgId, State) of
- not_found -> {ok, not_found, State};
+ not_found -> gen_server2:reply(From, not_found),
+ State;
#msg_location { ref_count = RefCount,
file = File,
offset = Offset,
@@ -501,7 +497,8 @@ read_message(MsgId, State =
ets:lookup(FileSummary, File),
case Locked of
true ->
- {blocked, State};
+ add_to_pending_gc_completion({read, MsgId, From},
+ State);
false ->
ok = case CurFile =:= File andalso {ok, Offset} >=
file_handle_cache:current_raw_offset(
@@ -535,10 +532,28 @@ read_message(MsgId, State =
%% cache.
ok
end,
- {ok, Msg, State1}
+ gen_server2:reply(From, {ok, Msg}),
+ State1
end;
{Msg, _RefCount} ->
- {ok, Msg, State}
+ gen_server2:reply(From, {ok, Msg}),
+ State
+ end
+ end.
+
+contains_message(MsgId, From, State = #msstate { gc_active = GCActive }) ->
+ case index_lookup(MsgId, State) of
+ not_found ->
+ gen_server2:reply(From, false),
+ State;
+ #msg_location { file = File } ->
+ case GCActive of
+ {A, B} when File == A orelse File == B ->
+ add_to_pending_gc_completion(
+ {contains, MsgId, From}, State);
+ _ ->
+ gen_server2:reply(From, true),
+ State
end
end.
@@ -588,12 +603,9 @@ run_pending(State = #msstate { pending_gc_completion = Pending }) ->
lists:foldl(fun run_pending/2, State1, lists:reverse(Pending)).
run_pending({read, MsgId, From}, State) ->
- case read_message(MsgId, State) of
- {ok, Msg, State1} -> gen_server2:reply(From, {ok, Msg}),
- State1;
- {blocked, State1} -> add_to_pending_gc_completion(
- {read, MsgId, From}, State1)
- end;
+ read_message(MsgId, From, State);
+run_pending({contains, MsgId, From}, State) ->
+ contains_message(MsgId, From, State);
run_pending({remove, MsgId}, State) ->
remove_message(MsgId, State).
@@ -924,7 +936,7 @@ maybe_roll_to_new_file(_, State) ->
maybe_compact(State = #msstate { sum_valid_data = SumValid,
sum_file_size = SumFileSize,
file_summary = FileSummary,
- gc_running = false })
+ gc_active = false })
when (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION ->
First = ets:first(FileSummary),
N = random_distributions:geometric(?GEOMETRIC_P),
@@ -938,7 +950,7 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
true = ets:update_element(FileSummary, Dest,
{#file_summary.locked, true}),
ok = rabbit_msg_store_gc:gc(Source, Dest),
- State1 #msstate { gc_running = true }
+ State1 #msstate { gc_active = {Source, Dest} }
end;
maybe_compact(State) ->
State.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 7187e322..f5d7978c 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -921,7 +921,7 @@ msg_store_write(MsgIds) ->
ok = lists:foldl(
fun (MsgId, ok) -> rabbit_msg_store:write(MsgId, MsgId) end,
ok, MsgIds).
-
+
test_msg_store() ->
stop_msg_store(),
ok = start_msg_store_empty(),
@@ -1016,16 +1016,23 @@ test_msg_store() ->
fun (MsgId, ok) ->
rabbit_msg_store:write(msg_id_bin(MsgId), Payload)
end, ok, MsgIdsBig),
- %% .., then remove even numbers ascending, and odd numbers
- %% descending. This hits the GC.
+ %% .., then 3s by 1...
ok = lists:foldl(
fun (MsgId, ok) ->
- rabbit_msg_store:remove([msg_id_bin(
- case MsgId rem 2 of
- 0 -> MsgId;
- 1 -> BigCount - MsgId
- end)])
- end, ok, MsgIdsBig),
+ rabbit_msg_store:remove([msg_id_bin(MsgId)])
+ end, ok, lists:seq(BigCount, 1, -3)),
+ %% .., then remove 3s by 2, from the young end first. This hits
+ %% GC (under 50% good data left, but no empty files. Must GC).
+ ok = lists:foldl(
+ fun (MsgId, ok) ->
+ rabbit_msg_store:remove([msg_id_bin(MsgId)])
+ end, ok, lists:seq(BigCount-1, 1, -3)),
+ %% .., then remove 3s by 3, from the young end first. This hits
+ %% GC...
+ ok = lists:foldl(
+ fun (MsgId, ok) ->
+ rabbit_msg_store:remove([msg_id_bin(MsgId)])
+ end, ok, lists:seq(BigCount-2, 1, -3)),
%% ensure empty
false = msg_store_contains(false, [msg_id_bin(M) || M <- MsgIdsBig]),
%% restart empty