summaryrefslogtreecommitdiff
path: root/src/rabbit_msg_store.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-01-17 19:40:55 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-01-17 19:40:55 +0000
commit021e6a2ea0bf4369bb7afb3b165dfd11909f7a6c (patch)
tree96aab6d0aece47d6dc505b67b0c0b09bce5604df /src/rabbit_msg_store.erl
parentcec16c054acd689681fd7d677811c6d04bd0fd81 (diff)
downloadrabbitmq-server-021e6a2ea0bf4369bb7afb3b165dfd11909f7a6c.tar.gz
two flavours of 'write'
Just like in the other applications of credit_flow:send/ack, we need two flavours of the surrounding API function, i.e. 'write' and 'write_flow' in this case. That way existing call sites, such as in rabbit_test can remain undisturbed.
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r--src/rabbit_msg_store.erl30
1 files changed, 19 insertions, 11 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e75f5655..28eb8213 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -21,7 +21,7 @@
-export([start_link/4, successfully_recovered_state/1,
client_init/4, client_terminate/1, client_delete_and_terminate/1,
client_ref/1, close_all_indicated/1,
- write/3, read/2, contains/2, remove/2]).
+ write/3, write_flow/3, read/2, contains/2, remove/2]).
-export([set_maximum_since_use/2, has_readers/2, combine_files/3,
delete_file/2]). %% internal
@@ -152,6 +152,7 @@
-spec(close_all_indicated/1 ::
(client_msstate()) -> rabbit_types:ok(client_msstate())).
-spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok').
+-spec(write_flow/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok').
-spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) ->
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
-spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()).
@@ -461,14 +462,11 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
-write(MsgId, Msg,
- CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
- client_ref = CRef,
- server = Server }) ->
- ok = client_update_flying(+1, MsgId, CState),
- ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
+write_flow(MsgId, Msg, CState = #client_msstate { server = Server }) ->
credit_flow:send(whereis(Server)),
- ok = server_cast(CState, {write, CRef, MsgId}).
+ client_write(MsgId, Msg, flow, CState).
+
+write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
read(MsgId,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
@@ -503,6 +501,13 @@ server_call(#client_msstate { server = Server }, Msg) ->
server_cast(#client_msstate { server = Server }, Msg) ->
gen_server2:cast(Server, Msg).
+client_write(MsgId, Msg, Flow,
+ CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
+ client_ref = CRef }) ->
+ ok = client_update_flying(+1, MsgId, CState),
+ ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
+ ok = server_cast(CState, {write, CRef, MsgId, Flow}).
+
client_read1(#msg_location { msg_id = MsgId, file = File } = MsgLocation, Defer,
CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
case ets:lookup(FileSummaryEts, File) of
@@ -798,11 +803,14 @@ handle_cast({client_delete, CRef},
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
-handle_cast({write, CRef, MsgId},
+handle_cast({write, CRef, MsgId, Flow},
State = #msstate { cur_file_cache_ets = CurFileCacheEts,
clients = Clients }) ->
- {CPid, _, _} = dict:fetch(CRef, Clients),
- credit_flow:ack(CPid),
+ case Flow of
+ flow -> {CPid, _, _} = dict:fetch(CRef, Clients),
+ credit_flow:ack(CPid);
+ noflow -> ok
+ end,
true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
case update_flying(-1, MsgId, CRef, State) of
process ->