summaryrefslogtreecommitdiff
path: root/src/rabbit_msg_store.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-01-17 15:32:07 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-01-17 15:32:07 +0000
commit0dca9922eec9bdb12a801c8523d5d513476920a0 (patch)
treee5fce04d66401dfe8747a289fbb18050c46ecd60 /src/rabbit_msg_store.erl
parentb0d278f1ca08e4fd315c1fc3344952e0173354dc (diff)
downloadrabbitmq-server-0dca9922eec9bdb12a801c8523d5d513476920a0.tar.gz
Flow control: vq -> msg_store.
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r--src/rabbit_msg_store.erl43
1 files changed, 26 insertions, 17 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e6a32b90..a609c8a8 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -436,7 +436,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} =
gen_server2:call(
- Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity),
+ Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun, self()},
+ infinity),
#client_msstate { server = Server,
client_ref = Ref,
file_handle_cache = dict:new(),
@@ -462,10 +463,12 @@ client_ref(#client_msstate { client_ref = Ref }) -> Ref.
write(MsgId, Msg,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
- client_ref = CRef }) ->
+ client_ref = CRef,
+ server = Server }) ->
ok = client_update_flying(+1, MsgId, CState),
ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
- ok = server_cast(CState, {write, CRef, MsgId}).
+ credit_flow:send(whereis(Server)),
+ ok = server_cast(CState, {write, CRef, self(), MsgId}).
read(MsgId,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
@@ -666,7 +669,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
ClientRefs, Dir, Server),
Clients = dict:from_list(
- [{CRef, {undefined, undefined}} || CRef <- ClientRefs1]),
+ [{CRef, {undefined, undefined, undefined}} ||
+ CRef <- ClientRefs1]),
%% CleanShutdown => msg location index and file_summary both
%% recovered correctly.
true = case {FileSummaryRecovered, CleanShutdown} of
@@ -731,10 +735,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- successfully_recovered_state -> 7;
- {new_client_state, _Ref, _MODC, _CloseFDsFun} -> 7;
- {read, _MsgId} -> 2;
- _ -> 0
+ successfully_recovered_state -> 7;
+ {new_client_state, _Ref, _MODC, _CloseFDsFun, _Pid} -> 7;
+ {read, _MsgId} -> 2;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
@@ -755,7 +759,7 @@ prioritise_info(Msg, _State) ->
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
-handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
+handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun, Pid}, _From,
State = #msstate { dir = Dir,
index_state = IndexState,
index_module = IndexModule,
@@ -765,7 +769,7 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
flying_ets = FlyingEts,
clients = Clients,
gc_pid = GCPid }) ->
- Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
+ Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun, Pid}, Clients),
reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
CurFileCacheEts, FlyingEts},
State #msstate { clients = Clients1 });
@@ -789,11 +793,14 @@ handle_cast({client_dying, CRef},
handle_cast({client_delete, CRef},
State = #msstate { clients = Clients }) ->
+ {_, _, Pid} = dict:fetch(CRef, Clients),
+ credit_flow:peer_down(Pid),
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
-handle_cast({write, CRef, MsgId},
+handle_cast({write, CRef, CPid, MsgId},
State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ credit_flow:ack(CPid),
true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
case update_flying(-1, MsgId, CRef, State) of
process ->
@@ -1204,10 +1211,10 @@ update_pending_confirms(Fun, CRef,
State = #msstate { clients = Clients,
cref_to_msg_ids = CTM }) ->
case dict:fetch(CRef, Clients) of
- {undefined, _CloseFDsFun} -> State;
- {MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM),
- State #msstate {
- cref_to_msg_ids = CTM1 }
+ {undefined, _CloseFDsFun, _Pid} -> State;
+ {MsgOnDiskFun, _CloseFDsFun, _Pid} -> CTM1 = Fun(MsgOnDiskFun, CTM),
+ State #msstate {
+ cref_to_msg_ids = CTM1 }
end.
record_pending_confirm(CRef, MsgId, State) ->
@@ -1294,8 +1301,10 @@ mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) ->
case (ets:update_element(FileHandlesEts, Key, {2, close})
andalso Invoke) of
true -> case dict:fetch(Ref, ClientRefs) of
- {_MsgOnDiskFun, undefined} -> ok;
- {_MsgOnDiskFun, CloseFDsFun} -> ok = CloseFDsFun()
+ {_MsgOnDiskFun, undefined, _Pid} ->
+ ok;
+ {_MsgOnDiskFun, CloseFDsFun, _Pid} ->
+ ok = CloseFDsFun()
end;
false -> ok
end