diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-10-20 15:02:01 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-10-20 15:02:01 +0100 |
commit | 658b6f73e9b60fe5e26b8e0dc149d70d8226488f (patch) | |
tree | ff33ff4768b814fa64d97bc5ed0a5ce720d14483 | |
parent | b826cead9a368f108585ace8e9fe68132883be42 (diff) | |
download | rabbitmq-server-658b6f73e9b60fe5e26b8e0dc149d70d8226488f.tar.gz |
move msg_store server identity and client identity into client state.
still need to update vq
-rw-r--r-- | src/rabbit_msg_store.erl | 106 |
1 files changed, 58 insertions, 48 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 66cc06cf..29262c5d 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,8 +34,8 @@ -behaviour(gen_server2). -export([start_link/4, successfully_recovered_state/1, - client_init/2, client_terminate/2, client_delete_and_terminate/3, - write/4, read/3, contains/2, remove/2, release/2, sync/3]). + client_init/2, client_terminate/1, client_delete_and_terminate/1, + write/3, read/2, contains/2, remove/2, release/2, sync/3]). -export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal @@ -86,7 +86,9 @@ }). -record(client_msstate, - { file_handle_cache, + { server, + client_ref, + file_handle_cache, index_state, index_module, dir, @@ -105,8 +107,11 @@ -ifdef(use_specs). -type(server() :: pid() | atom()). +-type(client_ref() :: binary()). -type(file_num() :: non_neg_integer()). -type(client_msstate() :: #client_msstate { + server :: server(), + client_ref :: client_ref(), file_handle_cache :: dict:dictionary(), index_state :: any(), index_module :: atom(), @@ -124,18 +129,18 @@ (atom(), file:filename(), [binary()] | 'undefined', startup_fun_state()) -> rabbit_types:ok_pid_or_error()). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). --spec(client_init/2 :: (server(), binary()) -> client_msstate()). --spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok'). --spec(client_delete_and_terminate/3 :: - (client_msstate(), server(), binary()) -> 'ok'). --spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) -> - rabbit_types:ok(client_msstate())). --spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) -> - {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). --spec(contains/2 :: (server(), rabbit_guid:guid()) -> boolean()). --spec(remove/2 :: (server(), [rabbit_guid:guid()]) -> 'ok'). --spec(release/2 :: (server(), [rabbit_guid:guid()]) -> 'ok'). --spec(sync/3 :: (server(), [rabbit_guid:guid()], fun (() -> any())) -> 'ok'). +-spec(client_init/2 :: (server(), client_ref()) -> client_msstate()). +-spec(client_terminate/1 :: (client_msstate()) -> 'ok'). +-spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok'). +-spec(write/3 :: (rabbit_guid:guid(), msg(), client_msstate()) -> + rabbit_types:ok(client_msstate())). +-spec(read/2 :: (rabbit_guid:guid(), client_msstate()) -> + {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). +-spec(contains/2 :: (rabbit_guid:guid(), client_msstate()) -> boolean()). +-spec(remove/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok'). +-spec(release/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok'). +-spec(sync/3 :: ([rabbit_guid:guid()], fun (() -> any()), client_msstate()) -> + 'ok'). -spec(sync/1 :: (server()) -> 'ok'). -spec(gc_done/4 :: (server(), non_neg_integer(), file_num(), file_num()) -> @@ -316,7 +321,9 @@ client_init(Server, Ref) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = gen_server2:call(Server, {new_client_state, Ref}, infinity), - #client_msstate { file_handle_cache = dict:new(), + #client_msstate { server = Server, + client_ref = Ref, + file_handle_cache = dict:new(), index_state = IState, index_module = IModule, dir = Dir, @@ -326,20 +333,20 @@ client_init(Server, Ref) -> dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }. -client_terminate(CState, Server) -> +client_terminate(CState) -> close_all_handles(CState), - ok = gen_server2:call(Server, client_terminate, infinity). + ok = server_call(CState, client_terminate). -client_delete_and_terminate(CState, Server, Ref) -> +client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), - ok = gen_server2:cast(Server, {client_delete, Ref}). + ok = server_cast(CState, {client_delete, Ref}). -write(Server, Guid, Msg, +write(Guid, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - {gen_server2:cast(Server, {write, Guid}), CState}. + {server_cast(CState, {write, Guid}), CState}. -read(Server, Guid, +read(Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }) -> %% 1. Check the dedup cache @@ -348,13 +355,12 @@ read(Server, Guid, %% 2. Check the cur file cache case ets:lookup(CurFileCacheEts, Guid) of [] -> - Defer = fun() -> {gen_server2:call( - Server, {read, Guid}, infinity), - CState} end, + Defer = fun() -> + {server_call(CState, {read, Guid}), CState} + end, case index_lookup_positive_ref_count(Guid, CState) of not_found -> Defer(); - MsgLocation -> client_read1(Server, MsgLocation, Defer, - CState) + MsgLocation -> client_read1(MsgLocation, Defer, CState) end; [{Guid, Msg, _CacheRefCount}] -> %% Although we've found it, we don't know the @@ -365,12 +371,12 @@ read(Server, Guid, {{ok, Msg}, CState} end. -contains(Server, Guid) -> gen_server2:call(Server, {contains, Guid}, infinity). -remove(_Server, []) -> ok; -remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}). -release(_Server, []) -> ok; -release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}). -sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}). +contains(Guid, CState) -> server_call(CState, {contains, Guid}). +remove([], _CState) -> ok; +remove(Guids, CState) -> server_cast(CState, {remove, Guids}). +release([], _CState) -> ok; +release(Guids, CState) -> server_cast(CState, {release, Guids}). +sync(Guids, K, CState) -> server_cast(CState, {sync, Guids, K}). sync(Server) -> gen_server2:cast(Server, sync). @@ -385,18 +391,22 @@ set_maximum_since_use(Server, Age) -> %% Client-side-only helpers %%---------------------------------------------------------------------------- -client_read1(Server, - #msg_location { guid = Guid, file = File } = MsgLocation, - Defer, +server_call(#client_msstate { server = Server }, Msg) -> + gen_server2:call(Server, Msg, infinity). + +server_cast(#client_msstate { server = Server }, Msg) -> + gen_server2:cast(Server, Msg). + +client_read1(#msg_location { guid = Guid, file = File } = MsgLocation, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> case ets:lookup(FileSummaryEts, File) of [] -> %% File has been GC'd and no longer exists. Go around again. - read(Server, Guid, CState); + read(Guid, CState); [#file_summary { locked = Locked, right = Right }] -> - client_read2(Server, Locked, Right, MsgLocation, Defer, CState) + client_read2(Locked, Right, MsgLocation, Defer, CState) end. -client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) -> +client_read2(false, undefined, _MsgLocation, Defer, _CState) -> %% Although we've already checked both caches and not found the %% message there, the message is apparently in the %% current_file. We can only arrive here if we are trying to read @@ -407,12 +417,12 @@ client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) -> %% contents of the current file, thus reads from the current file %% will end up here and will need to be deferred. Defer(); -client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) -> +client_read2(true, _Right, _MsgLocation, Defer, _CState) -> %% Of course, in the mean time, the GC could have run and our msg %% is actually in a different file, unlocked. However, defering is %% the safest and simplest thing to do. Defer(); -client_read2(Server, false, _Right, +client_read2(false, _Right, MsgLocation = #msg_location { guid = Guid, file = File }, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> @@ -421,10 +431,10 @@ client_read2(Server, false, _Right, %% finished. safe_ets_update_counter( FileSummaryEts, File, {#file_summary.readers, +1}, - fun (_) -> client_read3(Server, MsgLocation, Defer, CState) end, - fun () -> read(Server, Guid, CState) end). + fun (_) -> client_read3(MsgLocation, Defer, CState) end, + fun () -> read(Guid, CState) end). -client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, +client_read3(#msg_location { guid = Guid, file = File }, Defer, CState = #client_msstate { file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, @@ -448,7 +458,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, %% too). case ets:lookup(FileSummaryEts, File) of [] -> %% GC has deleted our file, just go round again. - read(Server, Guid, CState); + read(Guid, CState); [#file_summary { locked = true }] -> %% If we get a badarg here, then the GC has finished and %% deleted our file. Try going around again. Otherwise, @@ -459,7 +469,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, %% unlocks the dest) try Release(), Defer() - catch error:badarg -> read(Server, Guid, CState) + catch error:badarg -> read(Guid, CState) end; [#file_summary { locked = false }] -> %% Ok, we're definitely safe to continue - a GC involving @@ -484,7 +494,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, {{ok, Msg}, CState2}; MsgLocation -> %% different file! Release(), %% this MUST NOT fail with badarg - client_read1(Server, MsgLocation, Defer, CState) + client_read1(MsgLocation, Defer, CState) end end. |