summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-24 15:27:05 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-24 15:27:05 +0100
commit0157f0ccfc43f7b82f0791e6d3d68b9af632a634 (patch)
treed82d62964a4e4a3a9cded54129ed0afb4ae6d7da
parentd46aec5ae2cd306a623add21927d40b36859479f (diff)
downloadrabbitmq-server-0157f0ccfc43f7b82f0791e6d3d68b9af632a634.tar.gz
migrated msg_store to prioritisers
-rw-r--r--src/rabbit_msg_store.erl34
1 files changed, 26 insertions, 8 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 6576bfbb..3b3f678b 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -40,7 +40,7 @@
-export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+ terminate/2, code_change/3, prioritise_call/3, prioritise_cast/2]).
%%----------------------------------------------------------------------------
@@ -328,8 +328,8 @@ read(Server, Guid,
%% 2. Check the cur file cache
case ets:lookup(CurFileCacheEts, Guid) of
[] ->
- Defer = fun() -> {gen_server2:pcall(
- Server, 2, {read, Guid}, infinity),
+ Defer = fun() -> {gen_server2:call(
+ Server, {read, Guid}, infinity),
CState} end,
case index_lookup(Guid, CState) of
not_found -> Defer();
@@ -351,18 +351,18 @@ remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}).
release(_Server, []) -> ok;
release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}).
sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}).
-sync(Server) -> gen_server2:pcast(Server, 8, sync). %% internal
+sync(Server) -> gen_server2:cast(Server, sync). %% internal
gc_done(Server, Reclaimed, Source, Destination) ->
- gen_server2:pcast(Server, 8, {gc_done, Reclaimed, Source, Destination}).
+ gen_server2:cast(Server, {gc_done, Reclaimed, Source, Destination}).
set_maximum_since_use(Server, Age) ->
- gen_server2:pcast(Server, 8, {set_maximum_since_use, Age}).
+ gen_server2:cast(Server, {set_maximum_since_use, Age}).
client_init(Server, Ref) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
- gen_server2:pcall(Server, 7, {new_client_state, Ref}, infinity),
+ gen_server2:call(Server, {new_client_state, Ref}, infinity),
#client_msstate { file_handle_cache = dict:new(),
index_state = IState,
index_module = IModule,
@@ -382,7 +382,7 @@ client_delete_and_terminate(CState, Server, Ref) ->
ok = gen_server2:call(Server, {delete_client, Ref}, infinity).
successfully_recovered_state(Server) ->
- gen_server2:pcall(Server, 7, successfully_recovered_state, infinity).
+ gen_server2:call(Server, successfully_recovered_state, infinity).
%%----------------------------------------------------------------------------
%% Client-side-only helpers
@@ -581,6 +581,24 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+prioritise_call({new_client_state, _Ref}, _From, _State) ->
+ 7;
+prioritise_call(successfully_recovered_state, _From, _State) ->
+ 7;
+prioritise_call({read, _Guid}, _From, _State) ->
+ 2;
+prioritise_call(_Msg, _From, _State) ->
+ 0.
+
+prioritise_cast(sync, _State) ->
+ 8;
+prioritise_cast({gc_done, _Reclaimed, _Source, _Destination}, _State) ->
+ 8;
+prioritise_cast({set_maximum_since_use, _Age}, _State) ->
+ 8;
+prioritise_cast(_Msg, _State) ->
+ 0.
+
handle_call({read, Guid}, From, State) ->
State1 = read_message(Guid, From, State),
noreply(State1);