diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-04-16 22:43:15 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-04-16 22:43:15 +0100 |
commit | 4f92344718a60caf0b3d570a4b3ade1fb269d365 (patch) | |
tree | 27b0ceaa0aae9f757c08ca44ef845ae689fea5fa | |
parent | f6ee18e12eca5863b7b6c5e9d2bcc08388f5cf71 (diff) | |
download | rabbitmq-server-4f92344718a60caf0b3d570a4b3ade1fb269d365.tar.gz |
monitor workersbug26123
In a way this is a damage limitation exercise...
When a worker dies while working it is not in the pool anyway, so
noticing its death is a no-op.
When a worker dies while idle, it will not return to the pool when we
next submit work to it. But obviously the submitted work won't be
carried out, and the submitter will get an error (unless they
submitted the work asynchronously).
All the monitoring does is reduce the likelihood of the latter
happening. It cannot eliminate it though since the worker may die just
as work was being submitted to it.
-rw-r--r-- | src/worker_pool.erl | 14 | ||||
-rw-r--r-- | src/worker_pool_worker.erl | 2 |
2 files changed, 14 insertions, 2 deletions
diff --git a/src/worker_pool.erl b/src/worker_pool.erl index db8c4e96..b1dba5a2 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -28,7 +28,7 @@ -behaviour(gen_server2). --export([start_link/0, submit/1, submit_async/1, idle/1]). +-export([start_link/0, submit/1, submit_async/1, ready/1, idle/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -42,6 +42,7 @@ -spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). -spec(submit/1 :: (fun (() -> A) | mfargs()) -> A). -spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok'). +-spec(ready/1 :: (pid()) -> 'ok'). -spec(idle/1 :: (pid()) -> 'ok'). -endif. @@ -68,6 +69,8 @@ submit(Fun) -> submit_async(Fun) -> gen_server2:cast(?SERVER, {run_async, Fun}). +ready(WPid) -> gen_server2:cast(?SERVER, {ready, WPid}). + idle(WPid) -> gen_server2:cast(?SERVER, {idle, WPid}). %%---------------------------------------------------------------------------- @@ -88,6 +91,10 @@ handle_call({next_free, CPid}, _From, State = #state { available = handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. +handle_cast({ready, WPid}, State) -> + erlang:monitor(process, WPid), + handle_cast({idle, WPid}, State); + handle_cast({idle, WPid}, State = #state { available = Avail, pending = Pending }) -> {noreply, @@ -114,6 +121,11 @@ handle_cast({run_async, Fun}, State = #state { available = [WPid | Avail1] }) -> handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. +handle_info({'DOWN', _MRef, process, WPid, _Reason}, + State = #state { available = Avail }) -> + {noreply, State #state { available = ordsets:del_element(WPid, Avail) }, + hibernate}; + handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index ef6f115a..beb95bc6 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -72,7 +72,7 @@ run(Fun) -> init([]) -> ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), - ok = worker_pool:idle(self()), + ok = worker_pool:ready(self()), put(worker_pool_worker, true), {ok, undefined, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. |