summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-04-16 21:14:00 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2014-04-16 21:14:00 +0100
commitbc6e57ca96c63a7e54dc8540b0e0fe9f928075aa (patch)
treef05aa6e0ce9419c6b7967be9af6631b35361bff2
parentc6e332a5723ee6aececdf2ef67398ccc26b32362 (diff)
downloadrabbitmq-server-bc6e57ca96c63a7e54dc8540b0e0fe9f928075aa.tar.gz
record workers in a set
...instead of a queue. That way when an idle worker is restarted (and sends an 'idle' message to the pool), it won't end up in the pool twice. Note that we always hand out work to the first worker in the ordset. That is actually more efficient than the round-robin strategy we had with the queue since it keeps a smaller number of workers busy while others can hibernate.
-rw-r--r--src/worker_pool.erl64
1 files changed, 29 insertions, 35 deletions
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 0f265e22..268c703f 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -76,52 +76,46 @@ idle(WId) ->
%%----------------------------------------------------------------------------
init([]) ->
- {ok, #state { pending = queue:new(), available = queue:new() }, hibernate,
+ {ok, #state { pending = queue:new(), available = ordsets:new() }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({next_free, CPid}, From, State = #state { available = Avail,
- pending = Pending }) ->
- case queue:out(Avail) of
- {empty, _Avail} ->
- {noreply,
- State#state{pending = queue:in({next_free, From, CPid}, Pending)},
- hibernate};
- {{value, WId}, Avail1} ->
- WPid = get_worker_pid(WId),
- worker_pool_worker:next_job_from(WPid, CPid),
- {reply, WPid, State #state { available = Avail1 },
- hibernate}
- end;
+handle_call({next_free, CPid}, From, State = #state { available = [],
+ pending = Pending }) ->
+ {noreply, State#state{pending = queue:in({next_free, From, CPid}, Pending)},
+ hibernate};
+handle_call({next_free, CPid}, _From, State = #state { available =
+ [WId | Avail1] }) ->
+ WPid = get_worker_pid(WId),
+ worker_pool_worker:next_job_from(WPid, CPid),
+ {reply, WPid, State #state { available = Avail1 }, hibernate};
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
handle_cast({idle, WId}, State = #state { available = Avail,
- pending = Pending }) ->
- {noreply, case queue:out(Pending) of
- {empty, _Pending} ->
- State #state { available = queue:in(WId, Avail) };
- {{value, {next_free, From, CPid}}, Pending1} ->
- WPid = get_worker_pid(WId),
- worker_pool_worker:next_job_from(WPid, CPid),
- gen_server2:reply(From, WPid),
- State #state { pending = Pending1 };
- {{value, {run_async, Fun}}, Pending1} ->
- worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
- State #state { pending = Pending1 }
- end, hibernate};
-
-handle_cast({run_async, Fun}, State = #state { available = Avail,
- pending = Pending }) ->
+ pending = Pending }) ->
{noreply,
- case queue:out(Avail) of
- {empty, _Avail} ->
- State #state { pending = queue:in({run_async, Fun}, Pending)};
- {{value, WId}, Avail1} ->
+ case queue:out(Pending) of
+ {empty, _Pending} ->
+ State #state { available = ordsets:add_element(WId, Avail) };
+ {{value, {next_free, From, CPid}}, Pending1} ->
+ WPid = get_worker_pid(WId),
+ worker_pool_worker:next_job_from(WPid, CPid),
+ gen_server2:reply(From, WPid),
+ State #state { pending = Pending1 };
+ {{value, {run_async, Fun}}, Pending1} ->
worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
- State #state { available = Avail1 }
+ State #state { pending = Pending1 }
end, hibernate};
+handle_cast({run_async, Fun}, State = #state { available = [],
+ pending = Pending }) ->
+ {noreply, State #state { pending = queue:in({run_async, Fun}, Pending)},
+ hibernate};
+handle_cast({run_async, Fun}, State = #state { available = [WId | Avail1] }) ->
+ worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
+ {noreply, State #state { available = Avail1 }, hibernate};
+
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.