diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-10-17 10:57:21 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-10-17 10:57:21 +0100 |
commit | 4ee9f333afe549f6aab8dffd97d2488bd1a72b6d (patch) | |
tree | 93fc1bf234ba60794f2112e132ecc5984f92babb | |
parent | 0dad80ccb953d0444f7432996accfd1195412cab (diff) | |
download | rabbitmq-server-4ee9f333afe549f6aab8dffd97d2488bd1a72b6d.tar.gz |
cosmetic: put msg_store functions into a more logical order
...and add a missing spec
-rw-r--r-- | src/rabbit_msg_store.erl | 109 |
1 files changed, 56 insertions, 53 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 81d3c501..66cc06cf 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -33,9 +33,9 @@ -behaviour(gen_server2). --export([start_link/4, write/4, read/3, contains/2, remove/2, release/2, - sync/3, client_init/2, client_terminate/2, - client_delete_and_terminate/3, successfully_recovered_state/1]). +-export([start_link/4, successfully_recovered_state/1, + client_init/2, client_terminate/2, client_delete_and_terminate/3, + write/4, read/3, contains/2, remove/2, release/2, sync/3]). -export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal @@ -123,6 +123,11 @@ -spec(start_link/4 :: (atom(), file:filename(), [binary()] | 'undefined', startup_fun_state()) -> rabbit_types:ok_pid_or_error()). +-spec(successfully_recovered_state/1 :: (server()) -> boolean()). +-spec(client_init/2 :: (server(), binary()) -> client_msstate()). +-spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok'). +-spec(client_delete_and_terminate/3 :: + (client_msstate(), server(), binary()) -> 'ok'). -spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) -> rabbit_types:ok(client_msstate())). -spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) -> @@ -131,15 +136,11 @@ -spec(remove/2 :: (server(), [rabbit_guid:guid()]) -> 'ok'). -spec(release/2 :: (server(), [rabbit_guid:guid()]) -> 'ok'). -spec(sync/3 :: (server(), [rabbit_guid:guid()], fun (() -> any())) -> 'ok'). + +-spec(sync/1 :: (server()) -> 'ok'). -spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). --spec(client_init/2 :: (server(), binary()) -> client_msstate()). --spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok'). --spec(client_delete_and_terminate/3 :: - (client_msstate(), server(), binary()) -> 'ok'). --spec(successfully_recovered_state/1 :: (server()) -> boolean()). - -spec(gc/3 :: (non_neg_integer(), non_neg_integer(), {ets:tid(), file:filename(), atom(), any()}) -> 'concurrent_readers' | non_neg_integer()). @@ -308,6 +309,31 @@ start_link(Server, Dir, ClientRefs, StartupFunState) -> [Server, Dir, ClientRefs, StartupFunState], [{timeout, infinity}]). +successfully_recovered_state(Server) -> + gen_server2:call(Server, successfully_recovered_state, infinity). + +client_init(Server, Ref) -> + {IState, IModule, Dir, GCPid, + FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = + gen_server2:call(Server, {new_client_state, Ref}, infinity), + #client_msstate { file_handle_cache = dict:new(), + index_state = IState, + index_module = IModule, + dir = Dir, + gc_pid = GCPid, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts }. + +client_terminate(CState, Server) -> + close_all_handles(CState), + ok = gen_server2:call(Server, client_terminate, infinity). + +client_delete_and_terminate(CState, Server, Ref) -> + close_all_handles(CState), + ok = gen_server2:cast(Server, {client_delete, Ref}). + write(Server, Guid, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), @@ -345,7 +371,9 @@ remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}). release(_Server, []) -> ok; release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}). sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}). -sync(Server) -> gen_server2:cast(Server, sync). %% internal + +sync(Server) -> + gen_server2:cast(Server, sync). gc_done(Server, Reclaimed, Source, Destination) -> gen_server2:cast(Server, {gc_done, Reclaimed, Source, Destination}). @@ -353,31 +381,6 @@ gc_done(Server, Reclaimed, Source, Destination) -> set_maximum_since_use(Server, Age) -> gen_server2:cast(Server, {set_maximum_since_use, Age}). -client_init(Server, Ref) -> - {IState, IModule, Dir, GCPid, - FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = - gen_server2:call(Server, {new_client_state, Ref}, infinity), - #client_msstate { file_handle_cache = dict:new(), - index_state = IState, - index_module = IModule, - dir = Dir, - gc_pid = GCPid, - file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts }. - -client_terminate(CState, Server) -> - close_all_handles(CState), - ok = gen_server2:call(Server, client_terminate, infinity). - -client_delete_and_terminate(CState, Server, Ref) -> - close_all_handles(CState), - ok = gen_server2:cast(Server, {client_delete, Ref}). - -successfully_recovered_state(Server) -> - gen_server2:call(Server, successfully_recovered_state, infinity). - %%---------------------------------------------------------------------------- %% Client-side-only helpers %%---------------------------------------------------------------------------- @@ -577,8 +580,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> prioritise_call(Msg, _From, _State) -> case Msg of - {new_client_state, _Ref} -> 7; successfully_recovered_state -> 7; + {new_client_state, _Ref} -> 7; {read, _Guid} -> 2; _ -> 0 end. @@ -591,13 +594,8 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. -handle_call({read, Guid}, From, State) -> - State1 = read_message(Guid, From, State), - noreply(State1); - -handle_call({contains, Guid}, From, State) -> - State1 = contains_message(Guid, From, State), - noreply(State1); +handle_call(successfully_recovered_state, _From, State) -> + reply(State #msstate.successfully_recovered, State); handle_call({new_client_state, CRef}, _From, State = #msstate { dir = Dir, @@ -613,11 +611,21 @@ handle_call({new_client_state, CRef}, _From, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts}, State #msstate { client_refs = sets:add_element(CRef, ClientRefs) }); -handle_call(successfully_recovered_state, _From, State) -> - reply(State #msstate.successfully_recovered, State); - handle_call(client_terminate, _From, State) -> - reply(ok, State). + reply(ok, State); + +handle_call({read, Guid}, From, State) -> + State1 = read_message(Guid, From, State), + noreply(State1); + +handle_call({contains, Guid}, From, State) -> + State1 = contains_message(Guid, From, State), + noreply(State1). + +handle_cast({client_delete, CRef}, + State = #msstate { client_refs = ClientRefs }) -> + noreply( + State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }); handle_cast({write, Guid}, State = #msstate { sum_valid_data = SumValid, @@ -712,12 +720,7 @@ handle_cast({gc_done, Reclaimed, Src, Dst}, handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), - noreply(State); - -handle_cast({client_delete, CRef}, - State = #msstate { client_refs = ClientRefs }) -> - noreply( - State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }). + noreply(State). handle_info(timeout, State) -> noreply(internal_sync(State)); |