summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-10-17 10:57:21 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-10-17 10:57:21 +0100
commit4ee9f333afe549f6aab8dffd97d2488bd1a72b6d (patch)
tree93fc1bf234ba60794f2112e132ecc5984f92babb
parent0dad80ccb953d0444f7432996accfd1195412cab (diff)
downloadrabbitmq-server-4ee9f333afe549f6aab8dffd97d2488bd1a72b6d.tar.gz
cosmetic: put msg_store functions into a more logical order
...and add a missing spec
-rw-r--r--src/rabbit_msg_store.erl109
1 files changed, 56 insertions, 53 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 81d3c501..66cc06cf 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -33,9 +33,9 @@
-behaviour(gen_server2).
--export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
- sync/3, client_init/2, client_terminate/2,
- client_delete_and_terminate/3, successfully_recovered_state/1]).
+-export([start_link/4, successfully_recovered_state/1,
+ client_init/2, client_terminate/2, client_delete_and_terminate/3,
+ write/4, read/3, contains/2, remove/2, release/2, sync/3]).
-export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
@@ -123,6 +123,11 @@
-spec(start_link/4 ::
(atom(), file:filename(), [binary()] | 'undefined',
startup_fun_state()) -> rabbit_types:ok_pid_or_error()).
+-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
+-spec(client_init/2 :: (server(), binary()) -> client_msstate()).
+-spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok').
+-spec(client_delete_and_terminate/3 ::
+ (client_msstate(), server(), binary()) -> 'ok').
-spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) ->
rabbit_types:ok(client_msstate())).
-spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) ->
@@ -131,15 +136,11 @@
-spec(remove/2 :: (server(), [rabbit_guid:guid()]) -> 'ok').
-spec(release/2 :: (server(), [rabbit_guid:guid()]) -> 'ok').
-spec(sync/3 :: (server(), [rabbit_guid:guid()], fun (() -> any())) -> 'ok').
+
+-spec(sync/1 :: (server()) -> 'ok').
-spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) ->
'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
--spec(client_init/2 :: (server(), binary()) -> client_msstate()).
--spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok').
--spec(client_delete_and_terminate/3 ::
- (client_msstate(), server(), binary()) -> 'ok').
--spec(successfully_recovered_state/1 :: (server()) -> boolean()).
-
-spec(gc/3 :: (non_neg_integer(), non_neg_integer(),
{ets:tid(), file:filename(), atom(), any()}) ->
'concurrent_readers' | non_neg_integer()).
@@ -308,6 +309,31 @@ start_link(Server, Dir, ClientRefs, StartupFunState) ->
[Server, Dir, ClientRefs, StartupFunState],
[{timeout, infinity}]).
+successfully_recovered_state(Server) ->
+ gen_server2:call(Server, successfully_recovered_state, infinity).
+
+client_init(Server, Ref) ->
+ {IState, IModule, Dir, GCPid,
+ FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
+ gen_server2:call(Server, {new_client_state, Ref}, infinity),
+ #client_msstate { file_handle_cache = dict:new(),
+ index_state = IState,
+ index_module = IModule,
+ dir = Dir,
+ gc_pid = GCPid,
+ file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts,
+ cur_file_cache_ets = CurFileCacheEts }.
+
+client_terminate(CState, Server) ->
+ close_all_handles(CState),
+ ok = gen_server2:call(Server, client_terminate, infinity).
+
+client_delete_and_terminate(CState, Server, Ref) ->
+ close_all_handles(CState),
+ ok = gen_server2:cast(Server, {client_delete, Ref}).
+
write(Server, Guid, Msg,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
ok = update_msg_cache(CurFileCacheEts, Guid, Msg),
@@ -345,7 +371,9 @@ remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}).
release(_Server, []) -> ok;
release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}).
sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}).
-sync(Server) -> gen_server2:cast(Server, sync). %% internal
+
+sync(Server) ->
+ gen_server2:cast(Server, sync).
gc_done(Server, Reclaimed, Source, Destination) ->
gen_server2:cast(Server, {gc_done, Reclaimed, Source, Destination}).
@@ -353,31 +381,6 @@ gc_done(Server, Reclaimed, Source, Destination) ->
set_maximum_since_use(Server, Age) ->
gen_server2:cast(Server, {set_maximum_since_use, Age}).
-client_init(Server, Ref) ->
- {IState, IModule, Dir, GCPid,
- FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
- gen_server2:call(Server, {new_client_state, Ref}, infinity),
- #client_msstate { file_handle_cache = dict:new(),
- index_state = IState,
- index_module = IModule,
- dir = Dir,
- gc_pid = GCPid,
- file_handles_ets = FileHandlesEts,
- file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts }.
-
-client_terminate(CState, Server) ->
- close_all_handles(CState),
- ok = gen_server2:call(Server, client_terminate, infinity).
-
-client_delete_and_terminate(CState, Server, Ref) ->
- close_all_handles(CState),
- ok = gen_server2:cast(Server, {client_delete, Ref}).
-
-successfully_recovered_state(Server) ->
- gen_server2:call(Server, successfully_recovered_state, infinity).
-
%%----------------------------------------------------------------------------
%% Client-side-only helpers
%%----------------------------------------------------------------------------
@@ -577,8 +580,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- {new_client_state, _Ref} -> 7;
successfully_recovered_state -> 7;
+ {new_client_state, _Ref} -> 7;
{read, _Guid} -> 2;
_ -> 0
end.
@@ -591,13 +594,8 @@ prioritise_cast(Msg, _State) ->
_ -> 0
end.
-handle_call({read, Guid}, From, State) ->
- State1 = read_message(Guid, From, State),
- noreply(State1);
-
-handle_call({contains, Guid}, From, State) ->
- State1 = contains_message(Guid, From, State),
- noreply(State1);
+handle_call(successfully_recovered_state, _From, State) ->
+ reply(State #msstate.successfully_recovered, State);
handle_call({new_client_state, CRef}, _From,
State = #msstate { dir = Dir,
@@ -613,11 +611,21 @@ handle_call({new_client_state, CRef}, _From,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
State #msstate { client_refs = sets:add_element(CRef, ClientRefs) });
-handle_call(successfully_recovered_state, _From, State) ->
- reply(State #msstate.successfully_recovered, State);
-
handle_call(client_terminate, _From, State) ->
- reply(ok, State).
+ reply(ok, State);
+
+handle_call({read, Guid}, From, State) ->
+ State1 = read_message(Guid, From, State),
+ noreply(State1);
+
+handle_call({contains, Guid}, From, State) ->
+ State1 = contains_message(Guid, From, State),
+ noreply(State1).
+
+handle_cast({client_delete, CRef},
+ State = #msstate { client_refs = ClientRefs }) ->
+ noreply(
+ State #msstate { client_refs = sets:del_element(CRef, ClientRefs) });
handle_cast({write, Guid},
State = #msstate { sum_valid_data = SumValid,
@@ -712,12 +720,7 @@ handle_cast({gc_done, Reclaimed, Src, Dst},
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
- noreply(State);
-
-handle_cast({client_delete, CRef},
- State = #msstate { client_refs = ClientRefs }) ->
- noreply(
- State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }).
+ noreply(State).
handle_info(timeout, State) ->
noreply(internal_sync(State));