summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-03-13 19:14:21 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-03-13 19:14:21 +0000
commit4af7804b122b33a034c6d390a4fbb5fd4c73b9db (patch)
tree832db5efd24f87d1f9c2f84b7e6e05d2b584fb79
parentf3576738879eee8e4fba089a5f754246724b5adf (diff)
downloadrabbitmq-server-4af7804b122b33a034c6d390a4fbb5fd4c73b9db.tar.gz
invoke credit_flow:peer_down on, err, DOWN. *only*
Thus preventing potential leaks and deadlocks. Also, clean up 'clients' in clear_client, and thus client_terminate, which was previously missing and thus the source of another potential leak.
-rw-r--r--src/rabbit_msg_store.erl19
1 files changed, 11 insertions, 8 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 9a4439a7..627335a5 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -641,9 +641,11 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
end.
clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
- dying_clients = DyingClients }) ->
+ clients = Clients,
+ dying_clients = DyingClients }) ->
State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM),
- dying_clients = sets:del_element(CRef, DyingClients) }.
+ clients = dict:erase(CRef, Clients),
+ dying_clients = sets:del_element(CRef, DyingClients) }.
%%----------------------------------------------------------------------------
@@ -781,6 +783,7 @@ handle_call({new_client_state, CRef, CPid, MsgOnDiskFun, CloseFDsFun}, _From,
clients = Clients,
gc_pid = GCPid }) ->
Clients1 = dict:store(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients),
+ erlang:monitor(process, CPid),
reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
CurFileCacheEts, FlyingEts},
State #msstate { clients = Clients1 });
@@ -802,12 +805,8 @@ handle_cast({client_dying, CRef},
noreply(write_message(CRef, <<>>,
State #msstate { dying_clients = DyingClients1 }));
-handle_cast({client_delete, CRef},
- State = #msstate { clients = Clients }) ->
- {CPid, _, _} = dict:fetch(CRef, Clients),
- credit_flow:peer_down(CPid),
- State1 = State #msstate { clients = dict:erase(CRef, Clients) },
- noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
+handle_cast({client_delete, CRef}, State) ->
+ noreply(remove_message(CRef, CRef, clear_client(CRef, State)));
handle_cast({write, CRef, MsgId, Flow},
State = #msstate { cur_file_cache_ets = CurFileCacheEts,
@@ -888,6 +887,10 @@ handle_info(sync, State) ->
handle_info(timeout, State) ->
noreply(internal_sync(State));
+handle_info({'DOWN', _MRef, Pid, _Reason}, State) ->
+ credit_flow:peer_down(Pid),
+ noreply(State);
+
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.