summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-10-20 15:02:01 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-10-20 15:02:01 +0100
commit658b6f73e9b60fe5e26b8e0dc149d70d8226488f (patch)
treeff33ff4768b814fa64d97bc5ed0a5ce720d14483
parentb826cead9a368f108585ace8e9fe68132883be42 (diff)
downloadrabbitmq-server-658b6f73e9b60fe5e26b8e0dc149d70d8226488f.tar.gz
move msg_store server identity and client identity into client state.
still need to update vq
-rw-r--r--src/rabbit_msg_store.erl106
1 files changed, 58 insertions, 48 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 66cc06cf..29262c5d 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -34,8 +34,8 @@
-behaviour(gen_server2).
-export([start_link/4, successfully_recovered_state/1,
- client_init/2, client_terminate/2, client_delete_and_terminate/3,
- write/4, read/3, contains/2, remove/2, release/2, sync/3]).
+ client_init/2, client_terminate/1, client_delete_and_terminate/1,
+ write/3, read/2, contains/2, remove/2, release/2, sync/3]).
-export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
@@ -86,7 +86,9 @@
}).
-record(client_msstate,
- { file_handle_cache,
+ { server,
+ client_ref,
+ file_handle_cache,
index_state,
index_module,
dir,
@@ -105,8 +107,11 @@
-ifdef(use_specs).
-type(server() :: pid() | atom()).
+-type(client_ref() :: binary()).
-type(file_num() :: non_neg_integer()).
-type(client_msstate() :: #client_msstate {
+ server :: server(),
+ client_ref :: client_ref(),
file_handle_cache :: dict:dictionary(),
index_state :: any(),
index_module :: atom(),
@@ -124,18 +129,18 @@
(atom(), file:filename(), [binary()] | 'undefined',
startup_fun_state()) -> rabbit_types:ok_pid_or_error()).
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
--spec(client_init/2 :: (server(), binary()) -> client_msstate()).
--spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok').
--spec(client_delete_and_terminate/3 ::
- (client_msstate(), server(), binary()) -> 'ok').
--spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) ->
- rabbit_types:ok(client_msstate())).
--spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) ->
- {rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
--spec(contains/2 :: (server(), rabbit_guid:guid()) -> boolean()).
--spec(remove/2 :: (server(), [rabbit_guid:guid()]) -> 'ok').
--spec(release/2 :: (server(), [rabbit_guid:guid()]) -> 'ok').
--spec(sync/3 :: (server(), [rabbit_guid:guid()], fun (() -> any())) -> 'ok').
+-spec(client_init/2 :: (server(), client_ref()) -> client_msstate()).
+-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
+-spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok').
+-spec(write/3 :: (rabbit_guid:guid(), msg(), client_msstate()) ->
+ rabbit_types:ok(client_msstate())).
+-spec(read/2 :: (rabbit_guid:guid(), client_msstate()) ->
+ {rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
+-spec(contains/2 :: (rabbit_guid:guid(), client_msstate()) -> boolean()).
+-spec(remove/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok').
+-spec(release/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok').
+-spec(sync/3 :: ([rabbit_guid:guid()], fun (() -> any()), client_msstate()) ->
+ 'ok').
-spec(sync/1 :: (server()) -> 'ok').
-spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) ->
@@ -316,7 +321,9 @@ client_init(Server, Ref) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
gen_server2:call(Server, {new_client_state, Ref}, infinity),
- #client_msstate { file_handle_cache = dict:new(),
+ #client_msstate { server = Server,
+ client_ref = Ref,
+ file_handle_cache = dict:new(),
index_state = IState,
index_module = IModule,
dir = Dir,
@@ -326,20 +333,20 @@ client_init(Server, Ref) ->
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }.
-client_terminate(CState, Server) ->
+client_terminate(CState) ->
close_all_handles(CState),
- ok = gen_server2:call(Server, client_terminate, infinity).
+ ok = server_call(CState, client_terminate).
-client_delete_and_terminate(CState, Server, Ref) ->
+client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
- ok = gen_server2:cast(Server, {client_delete, Ref}).
+ ok = server_cast(CState, {client_delete, Ref}).
-write(Server, Guid, Msg,
+write(Guid, Msg,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
ok = update_msg_cache(CurFileCacheEts, Guid, Msg),
- {gen_server2:cast(Server, {write, Guid}), CState}.
+ {server_cast(CState, {write, Guid}), CState}.
-read(Server, Guid,
+read(Guid,
CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }) ->
%% 1. Check the dedup cache
@@ -348,13 +355,12 @@ read(Server, Guid,
%% 2. Check the cur file cache
case ets:lookup(CurFileCacheEts, Guid) of
[] ->
- Defer = fun() -> {gen_server2:call(
- Server, {read, Guid}, infinity),
- CState} end,
+ Defer = fun() ->
+ {server_call(CState, {read, Guid}), CState}
+ end,
case index_lookup_positive_ref_count(Guid, CState) of
not_found -> Defer();
- MsgLocation -> client_read1(Server, MsgLocation, Defer,
- CState)
+ MsgLocation -> client_read1(MsgLocation, Defer, CState)
end;
[{Guid, Msg, _CacheRefCount}] ->
%% Although we've found it, we don't know the
@@ -365,12 +371,12 @@ read(Server, Guid,
{{ok, Msg}, CState}
end.
-contains(Server, Guid) -> gen_server2:call(Server, {contains, Guid}, infinity).
-remove(_Server, []) -> ok;
-remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}).
-release(_Server, []) -> ok;
-release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}).
-sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}).
+contains(Guid, CState) -> server_call(CState, {contains, Guid}).
+remove([], _CState) -> ok;
+remove(Guids, CState) -> server_cast(CState, {remove, Guids}).
+release([], _CState) -> ok;
+release(Guids, CState) -> server_cast(CState, {release, Guids}).
+sync(Guids, K, CState) -> server_cast(CState, {sync, Guids, K}).
sync(Server) ->
gen_server2:cast(Server, sync).
@@ -385,18 +391,22 @@ set_maximum_since_use(Server, Age) ->
%% Client-side-only helpers
%%----------------------------------------------------------------------------
-client_read1(Server,
- #msg_location { guid = Guid, file = File } = MsgLocation,
- Defer,
+server_call(#client_msstate { server = Server }, Msg) ->
+ gen_server2:call(Server, Msg, infinity).
+
+server_cast(#client_msstate { server = Server }, Msg) ->
+ gen_server2:cast(Server, Msg).
+
+client_read1(#msg_location { guid = Guid, file = File } = MsgLocation, Defer,
CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
case ets:lookup(FileSummaryEts, File) of
[] -> %% File has been GC'd and no longer exists. Go around again.
- read(Server, Guid, CState);
+ read(Guid, CState);
[#file_summary { locked = Locked, right = Right }] ->
- client_read2(Server, Locked, Right, MsgLocation, Defer, CState)
+ client_read2(Locked, Right, MsgLocation, Defer, CState)
end.
-client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) ->
+client_read2(false, undefined, _MsgLocation, Defer, _CState) ->
%% Although we've already checked both caches and not found the
%% message there, the message is apparently in the
%% current_file. We can only arrive here if we are trying to read
@@ -407,12 +417,12 @@ client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) ->
%% contents of the current file, thus reads from the current file
%% will end up here and will need to be deferred.
Defer();
-client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) ->
+client_read2(true, _Right, _MsgLocation, Defer, _CState) ->
%% Of course, in the mean time, the GC could have run and our msg
%% is actually in a different file, unlocked. However, defering is
%% the safest and simplest thing to do.
Defer();
-client_read2(Server, false, _Right,
+client_read2(false, _Right,
MsgLocation = #msg_location { guid = Guid, file = File },
Defer,
CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
@@ -421,10 +431,10 @@ client_read2(Server, false, _Right,
%% finished.
safe_ets_update_counter(
FileSummaryEts, File, {#file_summary.readers, +1},
- fun (_) -> client_read3(Server, MsgLocation, Defer, CState) end,
- fun () -> read(Server, Guid, CState) end).
+ fun (_) -> client_read3(MsgLocation, Defer, CState) end,
+ fun () -> read(Guid, CState) end).
-client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
+client_read3(#msg_location { guid = Guid, file = File }, Defer,
CState = #client_msstate { file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
@@ -448,7 +458,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
%% too).
case ets:lookup(FileSummaryEts, File) of
[] -> %% GC has deleted our file, just go round again.
- read(Server, Guid, CState);
+ read(Guid, CState);
[#file_summary { locked = true }] ->
%% If we get a badarg here, then the GC has finished and
%% deleted our file. Try going around again. Otherwise,
@@ -459,7 +469,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
%% unlocks the dest)
try Release(),
Defer()
- catch error:badarg -> read(Server, Guid, CState)
+ catch error:badarg -> read(Guid, CState)
end;
[#file_summary { locked = false }] ->
%% Ok, we're definitely safe to continue - a GC involving
@@ -484,7 +494,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
{{ok, Msg}, CState2};
MsgLocation -> %% different file!
Release(), %% this MUST NOT fail with badarg
- client_read1(Server, MsgLocation, Defer, CState)
+ client_read1(MsgLocation, Defer, CState)
end
end.