diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-01-17 19:40:55 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-01-17 19:40:55 +0000 |
commit | 021e6a2ea0bf4369bb7afb3b165dfd11909f7a6c (patch) | |
tree | 96aab6d0aece47d6dc505b67b0c0b09bce5604df /src/rabbit_msg_store.erl | |
parent | cec16c054acd689681fd7d677811c6d04bd0fd81 (diff) | |
download | rabbitmq-server-021e6a2ea0bf4369bb7afb3b165dfd11909f7a6c.tar.gz |
two flavours of 'write'
Just like in the other applications of credit_flow:send/ack, we need
two flavours of the surrounding API function, i.e. 'write' and
'write_flow' in this case. That way existing call sites, such as in
rabbit_test can remain undisturbed.
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r-- | src/rabbit_msg_store.erl | 30 |
1 files changed, 19 insertions, 11 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index e75f5655..28eb8213 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -21,7 +21,7 @@ -export([start_link/4, successfully_recovered_state/1, client_init/4, client_terminate/1, client_delete_and_terminate/1, client_ref/1, close_all_indicated/1, - write/3, read/2, contains/2, remove/2]). + write/3, write_flow/3, read/2, contains/2, remove/2]). -export([set_maximum_since_use/2, has_readers/2, combine_files/3, delete_file/2]). %% internal @@ -152,6 +152,7 @@ -spec(close_all_indicated/1 :: (client_msstate()) -> rabbit_types:ok(client_msstate())). -spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'). +-spec(write_flow/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'). -spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) -> {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). -spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()). @@ -461,14 +462,11 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> client_ref(#client_msstate { client_ref = Ref }) -> Ref. -write(MsgId, Msg, - CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, - client_ref = CRef, - server = Server }) -> - ok = client_update_flying(+1, MsgId, CState), - ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), +write_flow(MsgId, Msg, CState = #client_msstate { server = Server }) -> credit_flow:send(whereis(Server)), - ok = server_cast(CState, {write, CRef, MsgId}). + client_write(MsgId, Msg, flow, CState). + +write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState). read(MsgId, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> @@ -503,6 +501,13 @@ server_call(#client_msstate { server = Server }, Msg) -> server_cast(#client_msstate { server = Server }, Msg) -> gen_server2:cast(Server, Msg). +client_write(MsgId, Msg, Flow, + CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, + client_ref = CRef }) -> + ok = client_update_flying(+1, MsgId, CState), + ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), + ok = server_cast(CState, {write, CRef, MsgId, Flow}). + client_read1(#msg_location { msg_id = MsgId, file = File } = MsgLocation, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> case ets:lookup(FileSummaryEts, File) of @@ -798,11 +803,14 @@ handle_cast({client_delete, CRef}, State1 = State #msstate { clients = dict:erase(CRef, Clients) }, noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); -handle_cast({write, CRef, MsgId}, +handle_cast({write, CRef, MsgId, Flow}, State = #msstate { cur_file_cache_ets = CurFileCacheEts, clients = Clients }) -> - {CPid, _, _} = dict:fetch(CRef, Clients), - credit_flow:ack(CPid), + case Flow of + flow -> {CPid, _, _} = dict:fetch(CRef, Clients), + credit_flow:ack(CPid); + noflow -> ok + end, true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), case update_flying(-1, MsgId, CRef, State) of process -> |