diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-04-16 21:14:00 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-04-16 21:14:00 +0100 |
commit | bc6e57ca96c63a7e54dc8540b0e0fe9f928075aa (patch) | |
tree | f05aa6e0ce9419c6b7967be9af6631b35361bff2 | |
parent | c6e332a5723ee6aececdf2ef67398ccc26b32362 (diff) | |
download | rabbitmq-server-bc6e57ca96c63a7e54dc8540b0e0fe9f928075aa.tar.gz |
record workers in a set
...instead of a queue.
That way when an idle worker is restarted (and sends an 'idle' message
to the pool), it won't end up in the pool twice.
Note that we always hand out work to the first worker in the
ordset. That is actually more efficient than the round-robin strategy
we had with the queue since it keeps a smaller number of workers busy
while others can hibernate.
-rw-r--r-- | src/worker_pool.erl | 64 |
1 files changed, 29 insertions, 35 deletions
diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 0f265e22..268c703f 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -76,52 +76,46 @@ idle(WId) -> %%---------------------------------------------------------------------------- init([]) -> - {ok, #state { pending = queue:new(), available = queue:new() }, hibernate, + {ok, #state { pending = queue:new(), available = ordsets:new() }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({next_free, CPid}, From, State = #state { available = Avail, - pending = Pending }) -> - case queue:out(Avail) of - {empty, _Avail} -> - {noreply, - State#state{pending = queue:in({next_free, From, CPid}, Pending)}, - hibernate}; - {{value, WId}, Avail1} -> - WPid = get_worker_pid(WId), - worker_pool_worker:next_job_from(WPid, CPid), - {reply, WPid, State #state { available = Avail1 }, - hibernate} - end; +handle_call({next_free, CPid}, From, State = #state { available = [], + pending = Pending }) -> + {noreply, State#state{pending = queue:in({next_free, From, CPid}, Pending)}, + hibernate}; +handle_call({next_free, CPid}, _From, State = #state { available = + [WId | Avail1] }) -> + WPid = get_worker_pid(WId), + worker_pool_worker:next_job_from(WPid, CPid), + {reply, WPid, State #state { available = Avail1 }, hibernate}; handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. handle_cast({idle, WId}, State = #state { available = Avail, - pending = Pending }) -> - {noreply, case queue:out(Pending) of - {empty, _Pending} -> - State #state { available = queue:in(WId, Avail) }; - {{value, {next_free, From, CPid}}, Pending1} -> - WPid = get_worker_pid(WId), - worker_pool_worker:next_job_from(WPid, CPid), - gen_server2:reply(From, WPid), - State #state { pending = Pending1 }; - {{value, {run_async, Fun}}, Pending1} -> - worker_pool_worker:submit_async(get_worker_pid(WId), Fun), - State #state { pending = Pending1 } - end, hibernate}; - -handle_cast({run_async, Fun}, State = #state { available = Avail, - pending = Pending }) -> + pending = Pending }) -> {noreply, - case queue:out(Avail) of - {empty, _Avail} -> - State #state { pending = queue:in({run_async, Fun}, Pending)}; - {{value, WId}, Avail1} -> + case queue:out(Pending) of + {empty, _Pending} -> + State #state { available = ordsets:add_element(WId, Avail) }; + {{value, {next_free, From, CPid}}, Pending1} -> + WPid = get_worker_pid(WId), + worker_pool_worker:next_job_from(WPid, CPid), + gen_server2:reply(From, WPid), + State #state { pending = Pending1 }; + {{value, {run_async, Fun}}, Pending1} -> worker_pool_worker:submit_async(get_worker_pid(WId), Fun), - State #state { available = Avail1 } + State #state { pending = Pending1 } end, hibernate}; +handle_cast({run_async, Fun}, State = #state { available = [], + pending = Pending }) -> + {noreply, State #state { pending = queue:in({run_async, Fun}, Pending)}, + hibernate}; +handle_cast({run_async, Fun}, State = #state { available = [WId | Avail1] }) -> + worker_pool_worker:submit_async(get_worker_pid(WId), Fun), + {noreply, State #state { available = Avail1 }, hibernate}; + handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. |