diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-01-17 15:32:07 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-01-17 15:32:07 +0000 |
commit | 0dca9922eec9bdb12a801c8523d5d513476920a0 (patch) | |
tree | e5fce04d66401dfe8747a289fbb18050c46ecd60 /src/rabbit_msg_store.erl | |
parent | b0d278f1ca08e4fd315c1fc3344952e0173354dc (diff) | |
download | rabbitmq-server-0dca9922eec9bdb12a801c8523d5d513476920a0.tar.gz |
Flow control: vq -> msg_store.
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r-- | src/rabbit_msg_store.erl | 43 |
1 files changed, 26 insertions, 17 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index e6a32b90..a609c8a8 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -436,7 +436,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} = gen_server2:call( - Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity), + Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun, self()}, + infinity), #client_msstate { server = Server, client_ref = Ref, file_handle_cache = dict:new(), @@ -462,10 +463,12 @@ client_ref(#client_msstate { client_ref = Ref }) -> Ref. write(MsgId, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, - client_ref = CRef }) -> + client_ref = CRef, + server = Server }) -> ok = client_update_flying(+1, MsgId, CState), ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), - ok = server_cast(CState, {write, CRef, MsgId}). + credit_flow:send(whereis(Server)), + ok = server_cast(CState, {write, CRef, self(), MsgId}). read(MsgId, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> @@ -666,7 +669,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> recover_index_and_client_refs(IndexModule, FileSummaryRecovered, ClientRefs, Dir, Server), Clients = dict:from_list( - [{CRef, {undefined, undefined}} || CRef <- ClientRefs1]), + [{CRef, {undefined, undefined, undefined}} || + CRef <- ClientRefs1]), %% CleanShutdown => msg location index and file_summary both %% recovered correctly. true = case {FileSummaryRecovered, CleanShutdown} of @@ -731,10 +735,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> prioritise_call(Msg, _From, _State) -> case Msg of - successfully_recovered_state -> 7; - {new_client_state, _Ref, _MODC, _CloseFDsFun} -> 7; - {read, _MsgId} -> 2; - _ -> 0 + successfully_recovered_state -> 7; + {new_client_state, _Ref, _MODC, _CloseFDsFun, _Pid} -> 7; + {read, _MsgId} -> 2; + _ -> 0 end. prioritise_cast(Msg, _State) -> @@ -755,7 +759,7 @@ prioritise_info(Msg, _State) -> handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); -handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, +handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun, Pid}, _From, State = #msstate { dir = Dir, index_state = IndexState, index_module = IndexModule, @@ -765,7 +769,7 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, flying_ets = FlyingEts, clients = Clients, gc_pid = GCPid }) -> - Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients), + Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun, Pid}, Clients), reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts}, State #msstate { clients = Clients1 }); @@ -789,11 +793,14 @@ handle_cast({client_dying, CRef}, handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> + {_, _, Pid} = dict:fetch(CRef, Clients), + credit_flow:peer_down(Pid), State1 = State #msstate { clients = dict:erase(CRef, Clients) }, noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); -handle_cast({write, CRef, MsgId}, +handle_cast({write, CRef, CPid, MsgId}, State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> + credit_flow:ack(CPid), true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), case update_flying(-1, MsgId, CRef, State) of process -> @@ -1204,10 +1211,10 @@ update_pending_confirms(Fun, CRef, State = #msstate { clients = Clients, cref_to_msg_ids = CTM }) -> case dict:fetch(CRef, Clients) of - {undefined, _CloseFDsFun} -> State; - {MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM), - State #msstate { - cref_to_msg_ids = CTM1 } + {undefined, _CloseFDsFun, _Pid} -> State; + {MsgOnDiskFun, _CloseFDsFun, _Pid} -> CTM1 = Fun(MsgOnDiskFun, CTM), + State #msstate { + cref_to_msg_ids = CTM1 } end. record_pending_confirm(CRef, MsgId, State) -> @@ -1294,8 +1301,10 @@ mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) -> case (ets:update_element(FileHandlesEts, Key, {2, close}) andalso Invoke) of true -> case dict:fetch(Ref, ClientRefs) of - {_MsgOnDiskFun, undefined} -> ok; - {_MsgOnDiskFun, CloseFDsFun} -> ok = CloseFDsFun() + {_MsgOnDiskFun, undefined, _Pid} -> + ok; + {_MsgOnDiskFun, CloseFDsFun, _Pid} -> + ok = CloseFDsFun() end; false -> ok end |