diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-31 13:15:10 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-03-31 13:15:10 +0100 |
commit | 3dae00aebf9bf45520219858374783834bb6f7d6 (patch) | |
tree | 567653d1dd05c18254c55629762a376798bb5c19 | |
parent | a4c348672a43acd05303a38af8c08196924fb650 (diff) | |
parent | a8ff68e6b4a856778897d8a1c357a1bf6cfcb1f3 (diff) | |
download | rabbitmq-server-3dae00aebf9bf45520219858374783834bb6f7d6.tar.gz |
merging bug23968 to default
-rw-r--r-- | src/rabbit_msg_store.erl | 139 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 2 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 7 |
3 files changed, 30 insertions, 118 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index bb26de64..e3aae572 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -21,7 +21,7 @@ -export([start_link/4, successfully_recovered_state/1, client_init/4, client_terminate/1, client_delete_and_terminate/1, client_ref/1, close_all_indicated/1, - write/3, read/2, contains/2, remove/2, release/2, sync/3]). + write/3, read/2, contains/2, remove/2, sync/3]). -export([sync/1, set_maximum_since_use/2, has_readers/2, combine_files/3, delete_file/2]). %% internal @@ -67,7 +67,6 @@ gc_pid, %% pid of our GC file_handles_ets, %% tid of the shared file handles table file_summary_ets, %% tid of the file summary table - dedup_cache_ets, %% tid of dedup cache table cur_file_cache_ets, %% tid of current file cache table dying_clients, %% set of dying clients clients, %% map of references of all registered clients @@ -87,7 +86,6 @@ gc_pid, file_handles_ets, file_summary_ets, - dedup_cache_ets, cur_file_cache_ets }). @@ -130,7 +128,6 @@ gc_pid :: pid(), file_handles_ets :: ets:tid(), file_summary_ets :: ets:tid(), - dedup_cache_ets :: ets:tid(), cur_file_cache_ets :: ets:tid()}). -type(msg_ref_delta_gen(A) :: fun ((A) -> 'finished' | @@ -153,7 +150,6 @@ {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). -spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()). -spec(remove/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok'). --spec(release/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok'). -spec(sync/3 :: ([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok'). @@ -396,7 +392,7 @@ successfully_recovered_state(Server) -> client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> {IState, IModule, Dir, GCPid, - FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = + FileHandlesEts, FileSummaryEts, CurFileCacheEts} = gen_server2:call( Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity), #client_msstate { server = Server, @@ -408,7 +404,6 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> gc_pid = GCPid, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }. client_terminate(CState = #client_msstate { client_ref = Ref }) -> @@ -429,27 +424,16 @@ write(MsgId, Msg, ok = server_cast(CState, {write, CRef, MsgId}). read(MsgId, - CState = #client_msstate { dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts }) -> - %% 1. Check the dedup cache - case fetch_and_increment_cache(DedupCacheEts, MsgId) of - not_found -> - %% 2. Check the cur file cache - case ets:lookup(CurFileCacheEts, MsgId) of - [] -> - Defer = fun() -> - {server_call(CState, {read, MsgId}), CState} - end, - case index_lookup_positive_ref_count(MsgId, CState) of - not_found -> Defer(); - MsgLocation -> client_read1(MsgLocation, Defer, CState) - end; - [{MsgId, Msg, _CacheRefCount}] -> - %% Although we've found it, we don't know the - %% refcount, so can't insert into dedup cache - {{ok, Msg}, CState} + CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> + %% Check the cur file cache + case ets:lookup(CurFileCacheEts, MsgId) of + [] -> + Defer = fun() -> {server_call(CState, {read, MsgId}), CState} end, + case index_lookup_positive_ref_count(MsgId, CState) of + not_found -> Defer(); + MsgLocation -> client_read1(MsgLocation, Defer, CState) end; - Msg -> + [{MsgId, Msg, _CacheRefCount}] -> {{ok, Msg}, CState} end. @@ -457,8 +441,6 @@ contains(MsgId, CState) -> server_call(CState, {contains, MsgId}). remove([], _CState) -> ok; remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> server_cast(CState, {remove, CRef, MsgIds}). -release([], _CState) -> ok; -release(MsgIds, CState) -> server_cast(CState, {release, MsgIds}). sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}). sync(Server) -> @@ -517,7 +499,6 @@ client_read2(false, _Right, client_read3(#msg_location { msg_id = MsgId, file = File }, Defer, CState = #client_msstate { file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, gc_pid = GCPid, client_ref = Ref }) -> Release = @@ -574,8 +555,8 @@ client_read3(#msg_location { msg_id = MsgId, file = File }, Defer, %% Could the msg_store now mark the file to be %% closed? No: marks for closing are issued only %% when the msg_store has locked the file. - {Msg, CState2} = %% This will never be the current file - read_from_disk(MsgLocation, CState1, DedupCacheEts), + %% This will never be the current file + {Msg, CState2} = read_from_disk(MsgLocation, CState1), Release(), %% this MUST NOT fail with badarg {{ok, Msg}, CState2}; #msg_location {} = MsgLocation -> %% different file! @@ -639,7 +620,6 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> %% CleanShutdown <=> msg location index and file_summary both %% recovered correctly. - DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]), FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles, [ordered_set, public]), CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]), @@ -669,7 +649,6 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> gc_pid = GCPid, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, dying_clients = sets:new(), clients = Clients, @@ -720,14 +699,12 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, index_module = IndexModule, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, clients = Clients, gc_pid = GCPid }) -> Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients), - reply({IndexState, IndexModule, Dir, GCPid, - FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts}, - State #msstate { clients = Clients1 }); + reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, + CurFileCacheEts}, State #msstate { clients = Clients1 }); handle_call({client_terminate, CRef}, _From, State) -> reply(ok, clear_client(CRef, State)); @@ -781,12 +758,6 @@ handle_cast({remove, CRef, MsgIds}, State) -> noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds), removed, State1))); -handle_cast({release, MsgIds}, State = - #msstate { dedup_cache_ets = DedupCacheEts }) -> - lists:foreach( - fun (MsgId) -> decrement_cache(DedupCacheEts, MsgId) end, MsgIds), - noreply(State); - handle_cast({sync, MsgIds, K}, State = #msstate { current_file = CurFile, current_file_handle = CurHdl, @@ -840,7 +811,6 @@ terminate(_Reason, State = #msstate { index_state = IndexState, gc_pid = GCPid, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, clients = Clients, dir = Dir }) -> @@ -856,7 +826,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, State3 = close_all_handles(State1), ok = store_file_summary(FileSummaryEts, Dir), [true = ets:delete(T) || - T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]], + T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]], IndexModule:terminate(IndexState), ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)}, {index_module, IndexModule}], Dir), @@ -978,26 +948,18 @@ write_message(MsgId, Msg, sum_valid_data = SumValid + TotalSize, sum_file_size = SumFileSize + TotalSize }). -read_message(MsgId, From, - State = #msstate { dedup_cache_ets = DedupCacheEts }) -> +read_message(MsgId, From, State) -> case index_lookup_positive_ref_count(MsgId, State) of - not_found -> - gen_server2:reply(From, not_found), - State; - MsgLocation -> - case fetch_and_increment_cache(DedupCacheEts, MsgId) of - not_found -> read_message1(From, MsgLocation, State); - Msg -> gen_server2:reply(From, {ok, Msg}), - State - end + not_found -> gen_server2:reply(From, not_found), + State; + MsgLocation -> read_message1(From, MsgLocation, State) end. -read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount, - file = File, offset = Offset } = MsgLoc, +read_message1(From, #msg_location { msg_id = MsgId, file = File, + offset = Offset } = MsgLoc, State = #msstate { current_file = CurFile, current_file_handle = CurHdl, file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }) -> case File =:= CurFile of true -> {Msg, State1} = @@ -1010,10 +972,8 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount, true -> file_handle_cache:flush(CurHdl); false -> ok end, - read_from_disk(MsgLoc, State, DedupCacheEts); + read_from_disk(MsgLoc, State); [{MsgId, Msg1, _CacheRefCount}] -> - ok = maybe_insert_into_cache( - DedupCacheEts, RefCount, MsgId, Msg1), {Msg1, State} end, gen_server2:reply(From, {ok, Msg}), @@ -1023,17 +983,14 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount, case Locked of true -> add_to_pending_gc_completion({read, MsgId, From}, File, State); - false -> {Msg, State1} = - read_from_disk(MsgLoc, State, DedupCacheEts), + false -> {Msg, State1} = read_from_disk(MsgLoc, State), gen_server2:reply(From, {ok, Msg}), State1 end end. -read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount, - file = File, offset = Offset, - total_size = TotalSize }, - State, DedupCacheEts) -> +read_from_disk(#msg_location { msg_id = MsgId, file = File, offset = Offset, + total_size = TotalSize }, State) -> {Hdl, State1} = get_read_handle(File, State), {ok, Offset} = file_handle_cache:position(Hdl, Offset), {ok, {MsgId, Msg}} = @@ -1049,7 +1006,6 @@ read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount, {proc_dict, get()} ]}} end, - ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg), {Msg, State1}. contains_message(MsgId, From, @@ -1068,8 +1024,7 @@ contains_message(MsgId, From, end. remove_message(MsgId, CRef, - State = #msstate { file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts }) -> + State = #msstate { file_summary_ets = FileSummaryEts }) -> case should_mask_action(CRef, MsgId, State) of {true, _Location} -> State; @@ -1090,8 +1045,7 @@ remove_message(MsgId, CRef, %% don't remove from CUR_FILE_CACHE_ETS_NAME here %% because there may be further writes in the mailbox %% for the same msg. - 1 -> ok = remove_cache_entry(DedupCacheEts, MsgId), - case ets:lookup(FileSummaryEts, File) of + 1 -> case ets:lookup(FileSummaryEts, File) of [#file_summary { locked = true }] -> add_to_pending_gc_completion( {remove, MsgId, CRef}, File, State); @@ -1101,8 +1055,7 @@ remove_message(MsgId, CRef, File, adjust_valid_total_size(File, -TotalSize, State)) end; - _ -> ok = decrement_cache(DedupCacheEts, MsgId), - ok = Dec(), + _ -> ok = Dec(), State end end. @@ -1325,12 +1278,6 @@ list_sorted_file_names(Dir, Ext) -> %% message cache helper functions %%---------------------------------------------------------------------------- -maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg) - when RefCount > 1 -> - update_msg_cache(DedupCacheEts, MsgId, Msg); -maybe_insert_into_cache(_DedupCacheEts, _RefCount, _MsgId, _Msg) -> - ok. - update_msg_cache(CacheEts, MsgId, Msg) -> case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of true -> ok; @@ -1339,34 +1286,6 @@ update_msg_cache(CacheEts, MsgId, Msg) -> fun () -> update_msg_cache(CacheEts, MsgId, Msg) end) end. -remove_cache_entry(DedupCacheEts, MsgId) -> - true = ets:delete(DedupCacheEts, MsgId), - ok. - -fetch_and_increment_cache(DedupCacheEts, MsgId) -> - case ets:lookup(DedupCacheEts, MsgId) of - [] -> - not_found; - [{_MsgId, Msg, _RefCount}] -> - safe_ets_update_counter_ok( - DedupCacheEts, MsgId, {3, +1}, - %% someone has deleted us in the meantime, insert us - fun () -> ok = update_msg_cache(DedupCacheEts, MsgId, Msg) end), - Msg - end. - -decrement_cache(DedupCacheEts, MsgId) -> - true = safe_ets_update_counter( - DedupCacheEts, MsgId, {3, -1}, - fun (N) when N =< 0 -> true = ets:delete(DedupCacheEts, MsgId); - (_N) -> true - end, - %% MsgId is not in there because although it's been - %% delivered, it's never actually been read (think: - %% persistent message held in RAM) - fun () -> true end), - ok. - %%---------------------------------------------------------------------------- %% index %%---------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index fb1c9a34..294fae97 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1786,8 +1786,6 @@ test_msg_store() -> true = msg_store_contains(true, MsgIds2ndHalf, MSCState2), %% read the second half again MSCState3 = msg_store_read(MsgIds2ndHalf, MSCState2), - %% release the second half, just for fun (aka code coverage) - ok = rabbit_msg_store:release(MsgIds2ndHalf, MSCState3), %% read the second half again, just for fun (aka code coverage) MSCState4 = msg_store_read(MsgIds2ndHalf, MSCState3), ok = rabbit_msg_store:client_terminate(MSCState4), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7a1102e5..ff7252fd 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -728,7 +728,7 @@ requeue(AckTags, MsgPropsFun, State) -> needs_confirming = false } end, a(reduce_memory_use( - ack(fun msg_store_release/3, + ack(fun (_, _, _) -> ok end, fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> {_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps), true, false, State1), @@ -972,11 +972,6 @@ msg_store_remove(MSCState, IsPersistent, MsgIds) -> MSCState, IsPersistent, fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end). -msg_store_release(MSCState, IsPersistent, MsgIds) -> - with_immutable_msg_store_state( - MSCState, IsPersistent, - fun (MCSState1) -> rabbit_msg_store:release(MsgIds, MCSState1) end). - msg_store_sync(MSCState, IsPersistent, MsgIds, Fun) -> with_immutable_msg_store_state( MSCState, IsPersistent, |