diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-12-20 17:22:43 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-12-20 17:22:43 +0000 |
commit | 9c43ce8583aba0344a02187573edb3958409499a (patch) | |
tree | 54d9fe9d64ea6befe46051e0f7b3de145ca1f242 | |
parent | 69d1c25e9139d2e8e7121a72af73a43ad1c90726 (diff) | |
download | rabbitmq-server-bug22161.tar.gz |
Ensure that :contains calls can't overtake :remove casts. Also modify the tests slightly.bug22161
-rw-r--r-- | src/rabbit_msg_store.erl | 66 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 25 |
2 files changed, 55 insertions, 36 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index f40c6270..c060c8d4 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -87,7 +87,7 @@ sum_valid_data, %% sum of valid data in all files sum_file_size, %% sum of file sizes pending_gc_completion, %% things to do once GC completes - gc_running %% is the GC currently working? + gc_active %% is the GC currently working? }). -include("rabbit_msg_store.hrl"). @@ -266,7 +266,7 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> sum_valid_data = 0, sum_file_size = 0, pending_gc_completion = [], - gc_running = false + gc_active = false }, ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State), @@ -295,17 +295,12 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call({read, MsgId}, From, State) -> - case read_message(MsgId, State) of - {ok, Msg, State1} -> reply({ok, Msg}, State1); - {blocked, State1} -> noreply(add_to_pending_gc_completion( - {read, MsgId, From}, State1)) - end; + State1 = read_message(MsgId, From, State), + noreply(State1); -handle_call({contains, MsgId}, _From, State) -> - reply(case index_lookup(MsgId, State) of - not_found -> false; - #msg_location {} -> true - end, State). +handle_call({contains, MsgId}, From, State) -> + State1 = contains_message(MsgId, From, State), + noreply(State1). handle_cast({write, MsgId, Msg}, State = #msstate { current_file_handle = CurHdl, @@ -383,7 +378,7 @@ handle_cast(sync, State) -> handle_cast({gc_done, Reclaimed, Source, Dest}, State = #msstate { sum_file_size = SumFileSize, - gc_running = true, + gc_active = {Source, Dest}, file_summary = FileSummary }) -> %% we always move data left, so Source has gone and was on the %% right, so need to make dest = source.right.left, and also @@ -399,7 +394,7 @@ handle_cast({gc_done, Reclaimed, Source, Dest}, true = ets:delete(FileSummary, Source), noreply(run_pending( State #msstate { sum_file_size = SumFileSize - Reclaimed, - gc_running = false })). + gc_active = false })). handle_info(timeout, State) -> noreply(sync(State)); @@ -485,12 +480,13 @@ sync(State = #msstate { current_file_handle = CurHdl, State1 #msstate { on_sync = [] } end. -read_message(MsgId, State = +read_message(MsgId, From, State = #msstate { current_file = CurFile, current_file_handle = CurHdl, file_summary = FileSummary }) -> case index_lookup(MsgId, State) of - not_found -> {ok, not_found, State}; + not_found -> gen_server2:reply(From, not_found), + State; #msg_location { ref_count = RefCount, file = File, offset = Offset, @@ -501,7 +497,8 @@ read_message(MsgId, State = ets:lookup(FileSummary, File), case Locked of true -> - {blocked, State}; + add_to_pending_gc_completion({read, MsgId, From}, + State); false -> ok = case CurFile =:= File andalso {ok, Offset} >= file_handle_cache:current_raw_offset( @@ -535,10 +532,28 @@ read_message(MsgId, State = %% cache. ok end, - {ok, Msg, State1} + gen_server2:reply(From, {ok, Msg}), + State1 end; {Msg, _RefCount} -> - {ok, Msg, State} + gen_server2:reply(From, {ok, Msg}), + State + end + end. + +contains_message(MsgId, From, State = #msstate { gc_active = GCActive }) -> + case index_lookup(MsgId, State) of + not_found -> + gen_server2:reply(From, false), + State; + #msg_location { file = File } -> + case GCActive of + {A, B} when File == A orelse File == B -> + add_to_pending_gc_completion( + {contains, MsgId, From}, State); + _ -> + gen_server2:reply(From, true), + State end end. @@ -588,12 +603,9 @@ run_pending(State = #msstate { pending_gc_completion = Pending }) -> lists:foldl(fun run_pending/2, State1, lists:reverse(Pending)). run_pending({read, MsgId, From}, State) -> - case read_message(MsgId, State) of - {ok, Msg, State1} -> gen_server2:reply(From, {ok, Msg}), - State1; - {blocked, State1} -> add_to_pending_gc_completion( - {read, MsgId, From}, State1) - end; + read_message(MsgId, From, State); +run_pending({contains, MsgId, From}, State) -> + contains_message(MsgId, From, State); run_pending({remove, MsgId}, State) -> remove_message(MsgId, State). @@ -924,7 +936,7 @@ maybe_roll_to_new_file(_, State) -> maybe_compact(State = #msstate { sum_valid_data = SumValid, sum_file_size = SumFileSize, file_summary = FileSummary, - gc_running = false }) + gc_active = false }) when (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION -> First = ets:first(FileSummary), N = random_distributions:geometric(?GEOMETRIC_P), @@ -938,7 +950,7 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, true = ets:update_element(FileSummary, Dest, {#file_summary.locked, true}), ok = rabbit_msg_store_gc:gc(Source, Dest), - State1 #msstate { gc_running = true } + State1 #msstate { gc_active = {Source, Dest} } end; maybe_compact(State) -> State. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 7187e322..f5d7978c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -921,7 +921,7 @@ msg_store_write(MsgIds) -> ok = lists:foldl( fun (MsgId, ok) -> rabbit_msg_store:write(MsgId, MsgId) end, ok, MsgIds). - + test_msg_store() -> stop_msg_store(), ok = start_msg_store_empty(), @@ -1016,16 +1016,23 @@ test_msg_store() -> fun (MsgId, ok) -> rabbit_msg_store:write(msg_id_bin(MsgId), Payload) end, ok, MsgIdsBig), - %% .., then remove even numbers ascending, and odd numbers - %% descending. This hits the GC. + %% .., then 3s by 1... ok = lists:foldl( fun (MsgId, ok) -> - rabbit_msg_store:remove([msg_id_bin( - case MsgId rem 2 of - 0 -> MsgId; - 1 -> BigCount - MsgId - end)]) - end, ok, MsgIdsBig), + rabbit_msg_store:remove([msg_id_bin(MsgId)]) + end, ok, lists:seq(BigCount, 1, -3)), + %% .., then remove 3s by 2, from the young end first. This hits + %% GC (under 50% good data left, but no empty files. Must GC). + ok = lists:foldl( + fun (MsgId, ok) -> + rabbit_msg_store:remove([msg_id_bin(MsgId)]) + end, ok, lists:seq(BigCount-1, 1, -3)), + %% .., then remove 3s by 3, from the young end first. This hits + %% GC... + ok = lists:foldl( + fun (MsgId, ok) -> + rabbit_msg_store:remove([msg_id_bin(MsgId)]) + end, ok, lists:seq(BigCount-2, 1, -3)), %% ensure empty false = msg_store_contains(false, [msg_id_bin(M) || M <- MsgIdsBig]), %% restart empty |