diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-06 15:08:51 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-06 15:08:51 +0000 |
commit | f78d7d205df06f4d90cceca928349a7bdb90dec4 (patch) | |
tree | 40ec65cea4c3941ce66b98c0df6442b364c840f0 | |
parent | 5d3b291386560f1d9629be9293f2d624779ef322 (diff) | |
download | rabbitmq-server-f78d7d205df06f4d90cceca928349a7bdb90dec4.tar.gz |
Reimplement or manually merge all past changes for 23329 onto new head of default. In general the code's a bit cleaner due to better apis within msg_store generally
-rw-r--r-- | src/rabbit_msg_store.erl | 158 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 4 |
2 files changed, 117 insertions, 45 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index e8b4e8e2..a85de453 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -81,6 +81,7 @@ file_summary_ets, %% tid of the file summary table dedup_cache_ets, %% tid of dedup cache table cur_file_cache_ets, %% tid of current file cache table + dying_clients_ets, %% tid of the dying clients table client_refs, %% set of references of all registered clients successfully_recovered, %% boolean: did we recover state? file_size_limit, %% how big are our files allowed to get? @@ -361,6 +362,7 @@ client_terminate(CState = #client_msstate { client_ref = Ref }) -> client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), + ok = server_cast(CState, {client_dying, Ref}), ok = server_cast(CState, {client_delete, Ref}). client_ref(#client_msstate { client_ref = Ref }) -> Ref. @@ -580,6 +582,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> [ordered_set, public]), CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]), + DyingClientsEts = ets:new(rabbit_msg_store_terminal, [set]), + {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit), State = #msstate { dir = Dir, @@ -598,6 +602,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, + dying_clients_ets = DyingClientsEts, client_refs = ClientRefs1, successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit, @@ -643,6 +648,7 @@ prioritise_cast(Msg, _State) -> {combine_files, _Source, _Destination, _Reclaimed} -> 8; {delete_file, _File, _Reclaimed} -> 8; {set_maximum_since_use, _Age} -> 8; + {client_dying, _Pid} -> 7; _ -> 0 end. @@ -681,11 +687,19 @@ handle_call({contains, Guid}, From, State) -> State1 = contains_message(Guid, From, State), noreply(State1). +handle_cast({client_dying, CRef}, + State = #msstate { dying_clients_ets = DyingClientsEts }) -> + true = ets:insert_new(DyingClientsEts, {CRef, const}), + write_message(CRef, <<>>, State); + handle_cast({client_delete, CRef}, - State = #msstate { client_refs = ClientRefs }) -> + State = #msstate { client_refs = ClientRefs, + dying_clients_ets = DyingClientsEts }) -> + true = ets:delete(DyingClientsEts, CRef), State1 = clear_client_callback(CRef, State), - noreply(State1 #msstate { - client_refs = sets:del_element(CRef, ClientRefs) }); + ClientRefs1 = sets:del_element(CRef, ClientRefs), + noreply(remove_message(CRef, CRef, + State1 #msstate { client_refs = ClientRefs1 })); handle_cast({write, CRef, Guid}, State = #msstate { sum_valid_data = SumValid, @@ -705,23 +719,33 @@ handle_cast({write, CRef, Guid}, error -> CTG end, State1 = State #msstate { cref_to_guids = CTG1 }, - case index_lookup(Guid, State1) of - not_found -> + case should_mask_action(CRef, Guid, State1) of + {true, _Location} -> + noreply(State1); + {false, not_found} -> write_message(Guid, Msg, State1); - #msg_location { ref_count = 0, file = File, total_size = TotalSize } -> - case ets:lookup(FileSummaryEts, File) of - [#file_summary { locked = true }] -> + {Mask, #msg_location { ref_count = 0, file = File, + total_size = TotalSize }} -> + case {Mask, ets:lookup(FileSummaryEts, File)} of + {false, [#file_summary { locked = true }]} -> ok = index_delete(Guid, State1), write_message(Guid, Msg, State1); - [#file_summary {}] -> - ok = index_update_ref_count(Guid, 1, State1), + {false_if_increment, [#file_summary { locked = true }]} -> + %% The msg for Guid is older then the client death + %% message, but seeing as it's being GC'd + %% currently, we'll have to write a new copy, + %% which will then be younger, so ignore this + %% write. + noreply(State1); + {_Mask, [#file_summary {}]} -> + ok = index_update_ref_count(Guid, 1, State), [_] = ets:update_counter( FileSummaryEts, File, [{#file_summary.valid_total_size, TotalSize}]), noreply(State1 #msstate { sum_valid_data = SumValid + TotalSize }) end; - #msg_location { ref_count = RefCount, file = File } -> + {_Mask, #msg_location { ref_count = RefCount, file = File }} -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC ok = index_update_ref_count(Guid, RefCount + 1, State1), @@ -731,12 +755,12 @@ handle_cast({write, CRef, Guid}, CTG; _ -> CTG1 end, - noreply(State #msstate { cref_to_guids = CTG2 }) + noreply(State1 #msstate { cref_to_guids = CTG2 }) end; handle_cast({remove, CRef, Guids}, State) -> State1 = lists:foldl( - fun (Guid, State2) -> remove_message(Guid, State2) end, + fun (Guid, State2) -> remove_message(Guid, CRef, State2) end, State, Guids), State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1), noreply(maybe_compact(State2)); @@ -801,6 +825,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, + dying_clients_ets = DyingClientsEts, client_refs = ClientRefs, dir = Dir }) -> %% stop the gc first, otherwise it could be working and we pull @@ -814,8 +839,8 @@ terminate(_Reason, State = #msstate { index_state = IndexState, end, State3 = close_all_handles(State1), store_file_summary(FileSummaryEts, Dir), - [ets:delete(T) || - T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]], + [ets:delete(T) || T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, + CurFileCacheEts, DyingClientsEts]], IndexModule:terminate(IndexState), store_recovery_terms([{client_refs, sets:to_list(ClientRefs)}, {index_module, IndexModule}], Dir), @@ -990,34 +1015,48 @@ contains_message(Guid, From, end end. -remove_message(Guid, State = #msstate { sum_valid_data = SumValid, - file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts }) -> - #msg_location { ref_count = RefCount, file = File, - total_size = TotalSize } = - index_lookup_positive_ref_count(Guid, State), - %% only update field, otherwise bad interaction with concurrent GC - Dec = fun () -> index_update_ref_count(Guid, RefCount - 1, State) end, - case RefCount of - %% don't remove from CUR_FILE_CACHE_ETS_NAME here because - %% there may be further writes in the mailbox for the same - %% msg. - 1 -> ok = remove_cache_entry(DedupCacheEts, Guid), - case ets:lookup(FileSummaryEts, File) of - [#file_summary { locked = true } ] -> - add_to_pending_gc_completion({remove, Guid}, File, State); - [#file_summary {}] -> +remove_message(Guid, CRef, + State = #msstate { sum_valid_data = SumValid, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts }) -> + case should_mask_action(CRef, Guid, State) of + {true, _Location} -> + State; + {false_if_increment, #msg_location { ref_count = 0 }} -> + %% CRef has tried to both write and remove this msg + %% whilst it's being GC'd. ASSERTION: + %% [#file_summary { locked = true }] = + %% ets:lookup(FileSummaryEts, File), + State; + {_Mask, #msg_location { ref_count = RefCount, file = File, + total_size = TotalSize }} when RefCount > 0 -> + %% only update field, otherwise bad interaction with + %% concurrent GC + Dec = + fun () -> index_update_ref_count(Guid, RefCount - 1, State) end, + case RefCount of + %% don't remove from CUR_FILE_CACHE_ETS_NAME here + %% because there may be further writes in the mailbox + %% for the same msg. + 1 -> ok = remove_cache_entry(DedupCacheEts, Guid), + case ets:lookup(FileSummaryEts, File) of + [#file_summary { locked = true }] -> + add_to_pending_gc_completion( + {remove, Guid, CRef}, File, State); + [#file_summary {}] -> + ok = Dec(), + [_] = ets:update_counter( + FileSummaryEts, File, + [{#file_summary.valid_total_size, + -TotalSize}]), + delete_file_if_empty( + File, State #msstate { + sum_valid_data = SumValid - TotalSize }) + end; + _ -> ok = decrement_cache(DedupCacheEts, Guid), ok = Dec(), - [_] = ets:update_counter( - FileSummaryEts, File, - [{#file_summary.valid_total_size, -TotalSize}]), - delete_file_if_empty( - File, State #msstate { - sum_valid_data = SumValid - TotalSize }) - end; - _ -> ok = decrement_cache(DedupCacheEts, Guid), - ok = Dec(), - State + State + end end. add_to_pending_gc_completion( @@ -1039,8 +1078,8 @@ run_pending_action({read, Guid, From}, State) -> read_message(Guid, From, State); run_pending_action({contains, Guid, From}, State) -> contains_message(Guid, From, State); -run_pending_action({remove, Guid}, State) -> - remove_message(Guid, State). +run_pending_action({remove, Guid, CRef}, State) -> + remove_message(Guid, CRef, State). safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> try @@ -1073,6 +1112,37 @@ client_confirm(CRef, Guids, error -> State end. +%% Detect whether the Guid is older or younger than the client's death +%% msg (if there is one). If the msg is older than the client death +%% msg, and it has a 0 ref_count we must only alter the ref_count it, +%% not rewrite the msg - rewriting it would make it younger than the +%% death msg and thus should be ignored. Note that this will +%% (correctly) return false when testing to remove the death msg +%% itself. +should_mask_action(CRef, Guid, + State = #msstate { dying_clients_ets = DyingClientsEts }) -> + Location = index_lookup(Guid, State), + {case ets:lookup(DyingClientsEts, CRef) of + [] -> + false; + [{_CRef, const}] -> + case Location of + not_found -> + true; + #msg_location { file = File, offset = Offset, + ref_count = RefCount } -> + #msg_location { file = DeathFile, offset = DeathOffset } = + index_lookup(CRef, State), + case {DeathFile, DeathOffset} < {File, Offset} of + true -> true; + false -> case RefCount of + 0 -> false_if_increment; + _ -> false + end + end + end + end, Location}. + %%---------------------------------------------------------------------------- %% file helper functions diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b9edad9a..8de5adbe 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1664,7 +1664,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> false -> ?TRANSIENT_MSG_STORE end, MSCState = rabbit_msg_store:client_init(MsgStore, Ref, undefined), - {A, B} = + {A, B = [{_SeqId, LastGuidWritten} | _]} = lists:foldl( fun (SeqId, {QiN, SeqIdsGuidsAcc}) -> Guid = rabbit_guid:guid(), @@ -1673,6 +1673,8 @@ queue_index_publish(SeqIds, Persistent, Qi) -> ok = rabbit_msg_store:write(Guid, Guid, MSCState), {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc]} end, {Qi, []}, SeqIds), + %% do this just to force all of the publishes through to the msg_store: + true = rabbit_msg_store:contains(LastGuidWritten, MSCState), ok = rabbit_msg_store:client_delete_and_terminate(MSCState), {A, B}. |