diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-06 18:35:19 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-10-06 18:35:19 +0100 |
commit | 607e3c7cb3368bf7ebb193ec4c3e98449cbd2aec (patch) | |
tree | d05c496fd62244cfb58874a287e4d2f269db2333 | |
parent | 7d15df90953e33d7e7c20b5268a63fa5df4607b4 (diff) | |
parent | 89116a5512be7754476803a7d03bb692672c386d (diff) | |
download | rabbitmq-server-607e3c7cb3368bf7ebb193ec4c3e98449cbd2aec.tar.gz |
Merging default into bug 23111
-rw-r--r-- | include/rabbit_msg_store_index.hrl | 1 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 175 | ||||
-rw-r--r-- | src/rabbit_msg_store_ets_index.erl | 6 |
3 files changed, 103 insertions, 79 deletions
diff --git a/include/rabbit_msg_store_index.hrl b/include/rabbit_msg_store_index.hrl index fba0b7cd..d4115363 100644 --- a/include/rabbit_msg_store_index.hrl +++ b/include/rabbit_msg_store_index.hrl @@ -51,6 +51,7 @@ [{fieldpos(), fieldvalue()}]), index_state()) -> 'ok'). -spec(delete/2 :: (rabbit_guid:guid(), index_state()) -> 'ok'). +-spec(delete_object/2 :: (keyvalue(), index_state()) -> 'ok'). -spec(delete_by_file/2 :: (fieldvalue(), index_state()) -> 'ok'). -spec(terminate/1 :: (index_state()) -> any()). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index bbecbfe2..81d3c501 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -325,7 +325,7 @@ read(Server, Guid, Defer = fun() -> {gen_server2:call( Server, {read, Guid}, infinity), CState} end, - case index_lookup(Guid, CState) of + case index_lookup_positive_ref_count(Guid, CState) of not_found -> Defer(); MsgLocation -> client_read1(Server, MsgLocation, Defer, CState) @@ -620,45 +620,31 @@ handle_call(client_terminate, _From, State) -> reply(ok, State). handle_cast({write, Guid}, - State = #msstate { current_file_handle = CurHdl, - current_file = CurFile, - sum_valid_data = SumValid, - sum_file_size = SumFileSize, - file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }) -> + State = #msstate { sum_valid_data = SumValid, + file_summary_ets = FileSummaryEts, + cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), case index_lookup(Guid, State) of not_found -> - %% New message, lots to do - {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), - {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg), - ok = index_insert(#msg_location { - guid = Guid, ref_count = 1, file = CurFile, - offset = CurOffset, total_size = TotalSize }, - State), - [#file_summary { valid_total_size = ValidTotalSize, - right = undefined, - locked = false, - file_size = FileSize }] = - ets:lookup(FileSummaryEts, CurFile), - ValidTotalSize1 = ValidTotalSize + TotalSize, - true = ets:update_element( - FileSummaryEts, CurFile, - [{#file_summary.valid_total_size, ValidTotalSize1}, - {#file_summary.file_size, FileSize + TotalSize}]), - NextOffset = CurOffset + TotalSize, - noreply( - maybe_roll_to_new_file( - NextOffset, State #msstate { - sum_valid_data = SumValid + TotalSize, - sum_file_size = SumFileSize + TotalSize })); + write_message(Guid, Msg, State); + #msg_location { ref_count = 0, file = File, total_size = TotalSize } -> + case ets:lookup(FileSummaryEts, File) of + [#file_summary { locked = true }] -> + ok = index_delete(Guid, State), + write_message(Guid, Msg, State); + [#file_summary {}] -> + ok = index_update_ref_count(Guid, 1, State), + [_] = ets:update_counter( + FileSummaryEts, File, + [{#file_summary.valid_total_size, TotalSize}]), + noreply(State #msstate { + sum_valid_data = SumValid + TotalSize }) + end; #msg_location { ref_count = RefCount } -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC - ok = index_update_fields(Guid, - {#msg_location.ref_count, RefCount + 1}, - State), + ok = index_update_ref_count(Guid, RefCount + 1, State), noreply(State) end; @@ -812,9 +798,31 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, State1 #msstate { on_sync = [] } end. +write_message(Guid, Msg, + State = #msstate { current_file_handle = CurHdl, + current_file = CurFile, + sum_valid_data = SumValid, + sum_file_size = SumFileSize, + file_summary_ets = FileSummaryEts }) -> + {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), + {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg), + ok = index_insert( + #msg_location { guid = Guid, ref_count = 1, file = CurFile, + offset = CurOffset, total_size = TotalSize }, State), + [#file_summary { right = undefined, locked = false }] = + ets:lookup(FileSummaryEts, CurFile), + [_,_] = ets:update_counter(FileSummaryEts, CurFile, + [{#file_summary.valid_total_size, TotalSize}, + {#file_summary.file_size, TotalSize}]), + NextOffset = CurOffset + TotalSize, + noreply(maybe_roll_to_new_file( + NextOffset, State #msstate { + sum_valid_data = SumValid + TotalSize, + sum_file_size = SumFileSize + TotalSize })). + read_message(Guid, From, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> - case index_lookup(Guid, State) of + case index_lookup_positive_ref_count(Guid, State) of not_found -> gen_server2:reply(From, not_found), State; @@ -887,7 +895,7 @@ read_from_disk(#msg_location { guid = Guid, ref_count = RefCount, {Msg, State1}. contains_message(Guid, From, State = #msstate { gc_active = GCActive }) -> - case index_lookup(Guid, State) of + case index_lookup_positive_ref_count(Guid, State) of not_found -> gen_server2:reply(From, false), State; @@ -906,36 +914,30 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts }) -> #msg_location { ref_count = RefCount, file = File, - total_size = TotalSize } = index_lookup(Guid, State), + total_size = TotalSize } = + index_lookup_positive_ref_count(Guid, State), + %% only update field, otherwise bad interaction with concurrent GC + Dec = fun () -> index_update_ref_count(Guid, RefCount - 1, State) end, case RefCount of - 1 -> - %% don't remove from CUR_FILE_CACHE_ETS_NAME here because - %% there may be further writes in the mailbox for the same - %% msg. - ok = remove_cache_entry(DedupCacheEts, Guid), - [#file_summary { valid_total_size = ValidTotalSize, - locked = Locked }] = - ets:lookup(FileSummaryEts, File), - case Locked of - true -> - add_to_pending_gc_completion({remove, Guid}, State); - false -> - ok = index_delete(Guid, State), - ValidTotalSize1 = ValidTotalSize - TotalSize, - true = - ets:update_element( - FileSummaryEts, File, - [{#file_summary.valid_total_size, ValidTotalSize1}]), - State1 = delete_file_if_empty(File, State), - State1 #msstate { sum_valid_data = SumValid - TotalSize } - end; - _ when 1 < RefCount -> - ok = decrement_cache(DedupCacheEts, Guid), - %% only update field, otherwise bad interaction with concurrent GC - ok = index_update_fields(Guid, - {#msg_location.ref_count, RefCount - 1}, - State), - State + %% don't remove from CUR_FILE_CACHE_ETS_NAME here because + %% there may be further writes in the mailbox for the same + %% msg. + 1 -> ok = remove_cache_entry(DedupCacheEts, Guid), + case ets:lookup(FileSummaryEts, File) of + [#file_summary { locked = true } ] -> + add_to_pending_gc_completion({remove, Guid}, State); + [#file_summary {}] -> + ok = Dec(), + [_] = ets:update_counter( + FileSummaryEts, File, + [{#file_summary.valid_total_size, -TotalSize}]), + delete_file_if_empty( + File, State #msstate { + sum_valid_data = SumValid - TotalSize }) + end; + _ -> ok = decrement_cache(DedupCacheEts, Guid), + ok = Dec(), + State end. add_to_pending_gc_completion( @@ -1106,6 +1108,16 @@ decrement_cache(DedupCacheEts, Guid) -> %% index %%---------------------------------------------------------------------------- +index_lookup_positive_ref_count(Key, State) -> + case index_lookup(Key, State) of + not_found -> not_found; + #msg_location { ref_count = 0 } -> not_found; + #msg_location {} = MsgLocation -> MsgLocation + end. + +index_update_ref_count(Key, RefCount, State) -> + index_update_fields(Key, {#msg_location.ref_count, RefCount}, State). + index_lookup(Key, #client_msstate { index_module = Index, index_state = State }) -> Index:lookup(Key, State); @@ -1498,6 +1510,10 @@ delete_file_if_empty(File, State = #msstate { end, true = mark_handle_to_close(FileHandlesEts, File), true = ets:delete(FileSummaryEts, File), + {ok, Messages, FileSize} = + scan_file_for_valid_messages(Dir, filenum_to_name(File)), + [index_delete(Guid, State) || + {Guid, _TotalSize, _Offset} <- Messages], State1 = close_handle(File, State), ok = file:delete(form_filename(Dir, filenum_to_name(File))), State1 #msstate { sum_file_size = SumFileSize - FileSize }; @@ -1553,7 +1569,7 @@ combine_files(#file_summary { file = Source, %% copy back in, and then copy over from Source %% otherwise we just truncate straight away and copy over from Source {DestinationWorkList, DestinationValid} = - find_unremoved_messages_in_file(Destination, State), + load_and_vacuum_message_file(Destination, State), {DestinationContiguousTop, DestinationWorkListTail} = drop_contiguous_block_prefix(DestinationWorkList), case DestinationWorkListTail of @@ -1579,8 +1595,7 @@ combine_files(#file_summary { file = Source, ok = file_handle_cache:sync(DestinationHdl), ok = file_handle_cache:delete(TmpHdl) end, - {SourceWorkList, SourceValid} = - find_unremoved_messages_in_file(Source, State), + {SourceWorkList, SourceValid} = load_and_vacuum_message_file(Source, State), ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, SourceHdl, DestinationHdl, Destination, State), %% tidy up @@ -1588,21 +1603,25 @@ combine_files(#file_summary { file = Source, ok = file_handle_cache:delete(SourceHdl), ExpectedSize. -find_unremoved_messages_in_file(File, - {_FileSummaryEts, Dir, Index, IndexState}) -> +load_and_vacuum_message_file(File, {_FileSummaryEts, Dir, Index, IndexState}) -> %% Messages here will be end-of-file at start-of-list {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), %% foldl will reverse so will end up with msgs in ascending offset order - lists:foldl(fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) -> - case Index:lookup(Guid, IndexState) of - #msg_location { file = File, total_size = TotalSize, - offset = Offset } = Entry -> - {[ Entry | List ], TotalSize + Size}; - _ -> - Acc - end - end, {[], 0}, Messages). + lists:foldl( + fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) -> + case Index:lookup(Guid, IndexState) of + #msg_location { file = File, total_size = TotalSize, + offset = Offset, ref_count = 0 } = Entry -> + ok = Index:delete_object(Entry, IndexState), + Acc; + #msg_location { file = File, total_size = TotalSize, + offset = Offset } = Entry -> + {[ Entry | List ], TotalSize + Size}; + _ -> + Acc + end + end, {[], 0}, Messages). copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, Destination, {_FileSummaryEts, _Dir, Index, IndexState}) -> diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl index 1eb3c11f..96be674c 100644 --- a/src/rabbit_msg_store_ets_index.erl +++ b/src/rabbit_msg_store_ets_index.erl @@ -35,7 +35,7 @@ -export([new/1, recover/1, lookup/2, insert/2, update/2, update_fields/3, delete/2, - delete_by_file/2, terminate/1]). + delete_object/2, delete_by_file/2, terminate/1]). -define(MSG_LOC_NAME, rabbit_msg_store_ets_index). -define(FILENAME, "msg_store_index.ets"). @@ -79,6 +79,10 @@ delete(Key, State) -> true = ets:delete(State #state.table, Key), ok. +delete_object(Obj, State) -> + true = ets:delete_object(State #state.table, Obj), + ok. + delete_by_file(File, State) -> MatchHead = #msg_location { file = File, _ = '_' }, ets:select_delete(State #state.table, [{MatchHead, [], [true]}]), |