summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile5
-rw-r--r--include/gm_specs.hrl3
-rw-r--r--src/gm.erl43
-rw-r--r--src/gm_soak_test.erl4
-rw-r--r--src/gm_speed_test.erl4
-rw-r--r--src/gm_tests.erl4
-rw-r--r--src/mirrored_supervisor.erl8
-rw-r--r--src/rabbit_amqqueue.erl24
-rw-r--r--src/rabbit_amqqueue_process.erl38
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl12
-rw-r--r--src/rabbit_mirror_queue_master.erl2
-rw-r--r--src/rabbit_mirror_queue_misc.erl70
-rw-r--r--src/rabbit_mirror_queue_slave.erl40
-rw-r--r--src/rabbit_reader.erl3
-rw-r--r--src/worker_pool.erl104
-rw-r--r--src/worker_pool_sup.erl2
-rw-r--r--src/worker_pool_worker.erl54
17 files changed, 200 insertions, 220 deletions
diff --git a/Makefile b/Makefile
index e413f879..81a477ae 100644
--- a/Makefile
+++ b/Makefile
@@ -194,6 +194,11 @@ run: all
RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \
./scripts/rabbitmq-server
+run-background: all
+ $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \
+ RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \
+ ./scripts/rabbitmq-server -detached
+
run-node: all
$(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \
RABBITMQ_NODE_ONLY=true \
diff --git a/include/gm_specs.hrl b/include/gm_specs.hrl
index f4ea0df8..245c23bc 100644
--- a/include/gm_specs.hrl
+++ b/include/gm_specs.hrl
@@ -21,8 +21,7 @@
-type(members() :: [pid()]).
-spec(joined/2 :: (args(), members()) -> callback_result()).
--spec(members_changed/4 :: (args(), members(),
- members(), members()) -> callback_result()).
+-spec(members_changed/3 :: (args(), members(), members()) -> callback_result()).
-spec(handle_msg/3 :: (args(), pid(), any()) -> callback_result()).
-spec(terminate/2 :: (args(), term()) -> any()).
diff --git a/src/gm.erl b/src/gm.erl
index 2ed2fcf1..1eaf7459 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -476,8 +476,8 @@
%% joined/2 before receiving any messages from it; and (2) we will not
%% see members die that we have not seen born (or supplied in the
%% members to joined/2).
--callback members_changed(Args :: term(), Births :: [pid()],
- Deaths :: [pid()], Live :: [pid()]) ->
+-callback members_changed(Args :: term(),
+ Births :: [pid()], Deaths :: [pid()]) ->
ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
%% Supplied with Args provided in start_link, the sender, and the
@@ -496,7 +496,7 @@
-else.
behaviour_info(callbacks) ->
- [{joined, 2}, {members_changed, 4}, {handle_msg, 3}, {terminate, 2}];
+ [{joined, 2}, {members_changed, 3}, {handle_msg, 3}, {terminate, 2}];
behaviour_info(_Other) ->
undefined.
@@ -581,7 +581,11 @@ handle_call({confirmed_broadcast, Msg}, _From,
ok, State});
handle_call({confirmed_broadcast, Msg}, From, State) ->
- internal_broadcast(Msg, From, 0, State);
+ {Result, State1 = #state { pub_count = PubCount, confirms = Confirms }} =
+ internal_broadcast(Msg, 0, State),
+ Confirms1 = queue:in({PubCount, From}, Confirms),
+ handle_callback_result({Result, flush_broadcast_buffer(
+ State1 #state { confirms = Confirms1 })});
handle_call(info, _From,
State = #state { members_state = undefined }) ->
@@ -657,7 +661,8 @@ handle_cast({broadcast, Msg, _SizeHint},
State});
handle_cast({broadcast, Msg, SizeHint}, State) ->
- internal_broadcast(Msg, none, SizeHint, State);
+ {Result, State1} = internal_broadcast(Msg, SizeHint, State),
+ handle_callback_result({Result, maybe_flush_broadcast_buffer(State1)});
handle_cast(join, State = #state { self = Self,
group_name = GroupName,
@@ -685,8 +690,7 @@ handle_cast({validate_members, OldMembers},
Deaths = OldMembers -- NewMembers,
case {Births, Deaths} of
{[], []} -> noreply(State);
- _ -> Result = Module:members_changed(
- Args, Births, Deaths, NewMembers),
+ _ -> Result = Module:members_changed(Args, Births, Deaths),
handle_callback_result({Result, State})
end;
@@ -877,30 +881,18 @@ ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->
ensure_broadcast_timer(State) ->
State.
-internal_broadcast(Msg, From, SizeHint,
+internal_broadcast(Msg, SizeHint,
State = #state { self = Self,
pub_count = PubCount,
module = Module,
- confirms = Confirms,
callback_args = Args,
broadcast_buffer = Buffer,
broadcast_buffer_sz = BufferSize }) ->
PubCount1 = PubCount + 1,
- Result = Module:handle_msg(Args, get_pid(Self), Msg),
- Buffer1 = [{PubCount1, Msg} | Buffer],
- Confirms1 = case From of
- none -> Confirms;
- _ -> queue:in({PubCount1, From}, Confirms)
- end,
- State1 = State #state { pub_count = PubCount1,
- confirms = Confirms1,
- broadcast_buffer = Buffer1,
- broadcast_buffer_sz = BufferSize + SizeHint},
- handle_callback_result(
- {Result, case From of
- none -> maybe_flush_broadcast_buffer(State1);
- _ -> flush_broadcast_buffer(State1)
- end}).
+ {Module:handle_msg(Args, get_pid(Self), Msg),
+ State #state { pub_count = PubCount1,
+ broadcast_buffer = [{PubCount1, Msg} | Buffer],
+ broadcast_buffer_sz = BufferSize + SizeHint}}.
%% The Erlang distribution mechanism has an interesting quirk - it
%% will kill the VM cold with "Absurdly large distribution output data
@@ -1399,8 +1391,7 @@ callback_view_changed(Args, Module, OldView, NewView) ->
case {Births, Deaths} of
{[], []} -> ok;
_ -> Module:members_changed(
- Args, get_pids(Births), get_pids(Deaths),
- get_pids(NewMembers))
+ Args, get_pids(Births), get_pids(Deaths))
end.
handle_callback_result({Result, State}) ->
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index 4ff1645a..c9a25522 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -17,7 +17,7 @@
-module(gm_soak_test).
-export([test/0]).
--export([joined/2, members_changed/4, handle_msg/3, terminate/2]).
+-export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
-behaviour(gm).
@@ -51,7 +51,7 @@ joined([], Members) ->
put(ts, now()),
ok.
-members_changed([], Births, Deaths, _Live) ->
+members_changed([], Births, Deaths) ->
with_state(
fun (State) ->
State1 =
diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl
index fa515fa8..41be6dd8 100644
--- a/src/gm_speed_test.erl
+++ b/src/gm_speed_test.erl
@@ -17,7 +17,7 @@
-module(gm_speed_test).
-export([test/3]).
--export([joined/2, members_changed/4, handle_msg/3, terminate/2]).
+-export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
-export([wile_e_coyote/2]).
-behaviour(gm).
@@ -30,7 +30,7 @@ joined(Owner, _Members) ->
Owner ! joined,
ok.
-members_changed(_Owner, _Births, _Deaths, _Live) ->
+members_changed(_Owner, _Births, _Deaths) ->
ok.
handle_msg(Owner, _From, ping) ->
diff --git a/src/gm_tests.erl b/src/gm_tests.erl
index 23b8f8cb..cae2164b 100644
--- a/src/gm_tests.erl
+++ b/src/gm_tests.erl
@@ -22,7 +22,7 @@
test_member_death/0,
test_receive_in_order/0,
all_tests/0]).
--export([joined/2, members_changed/4, handle_msg/3, terminate/2]).
+-export([joined/2, members_changed/3, handle_msg/3, terminate/2]).
-behaviour(gm).
@@ -40,7 +40,7 @@ joined(Pid, Members) ->
Pid ! {joined, self(), Members},
ok.
-members_changed(Pid, Births, Deaths, _Live) ->
+members_changed(Pid, Births, Deaths) ->
Pid ! {members_changed, self(), Births, Deaths},
ok.
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 7a352451..1ed6d710 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -446,10 +446,10 @@ supervisor(Pid) -> with_exit_handler(fun() -> dead end,
fun() -> delegate(Pid) end).
write(Group, Overall, ChildSpec) ->
- ok = mnesia:write(
- #mirrored_sup_childspec{key = {Group, id(ChildSpec)},
- mirroring_pid = Overall,
- childspec = ChildSpec}),
+ S = #mirrored_sup_childspec{key = {Group, id(ChildSpec)},
+ mirroring_pid = Overall,
+ childspec = ChildSpec},
+ ok = mnesia:write(?TABLE, S, write),
ChildSpec.
delete(Group, Id) ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d38f8191..1aba7ecb 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -45,8 +45,6 @@
-define(MORE_CONSUMER_CREDIT_AFTER, 50).
--define(FAILOVER_WAIT_MILLIS, 100).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -517,26 +515,10 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
-%% We need to account for the idea that queues may be mid-promotion
-%% during force_event_refresh (since it's likely we're doing this in
-%% the first place since a node failed). Therefore we keep poking at
-%% the list of queues until we were able to talk to a live process or
-%% the queue no longer exists.
force_event_refresh(Ref) ->
- force_event_refresh([Q#amqqueue.name || Q <- list()], Ref).
-
-force_event_refresh(QNames, Ref) ->
- Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)],
- {_, Bad} = gen_server2:mcall(
- [{Q#amqqueue.pid, {force_event_refresh, Ref}} || Q <- Qs]),
- FailedPids = [Pid || {Pid, _Reason} <- Bad],
- Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs,
- lists:member(Pid, FailedPids)],
- case Failed of
- [] -> ok;
- _ -> timer:sleep(?FAILOVER_WAIT_MILLIS),
- force_event_refresh(Failed, Ref)
- end.
+ [gen_server2:cast(Q#amqqueue.pid,
+ {force_event_refresh, Ref}) || Q <- list()],
+ ok.
notify_policy_changed(#amqqueue{pid = QPid}) ->
gen_server2:cast(QPid, policy_changed).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5d3f3a12..9b785303 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1062,25 +1062,7 @@ handle_call(sync_mirrors, _From, State) ->
%% By definition if we get this message here we do not have to do anything.
handle_call(cancel_sync_mirrors, _From, State) ->
- reply({ok, not_syncing}, State);
-
-handle_call({force_event_refresh, Ref}, _From,
- State = #q{consumers = Consumers,
- exclusive_consumer = Exclusive}) ->
- rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State), Ref),
- QName = qname(State),
- AllConsumers = rabbit_queue_consumers:all(Consumers),
- case Exclusive of
- none -> [emit_consumer_created(
- Ch, CTag, false, AckRequired, QName, Prefetch,
- Args, Ref) ||
- {Ch, CTag, AckRequired, Prefetch, Args}
- <- AllConsumers];
- {Ch, CTag} -> [{Ch, CTag, AckRequired, Prefetch, Args}] = AllConsumers,
- emit_consumer_created(
- Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref)
- end,
- reply(ok, State).
+ reply({ok, not_syncing}, State).
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
@@ -1167,6 +1149,24 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
run_message_queue(true, State1)
end);
+handle_cast({force_event_refresh, Ref},
+ State = #q{consumers = Consumers,
+ exclusive_consumer = Exclusive}) ->
+ rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State), Ref),
+ QName = qname(State),
+ AllConsumers = rabbit_queue_consumers:all(Consumers),
+ case Exclusive of
+ none -> [emit_consumer_created(
+ Ch, CTag, false, AckRequired, QName, Prefetch,
+ Args, Ref) ||
+ {Ch, CTag, AckRequired, Prefetch, Args}
+ <- AllConsumers];
+ {Ch, CTag} -> [{Ch, CTag, AckRequired, Prefetch, Args}] = AllConsumers,
+ emit_consumer_created(
+ Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref)
+ end,
+ noreply(State);
+
handle_cast(notify_decorators, State) ->
notify_decorators(State),
noreply(State);
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 2feeea5a..23718da1 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -21,7 +21,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
--export([joined/2, members_changed/4, handle_msg/3]).
+-export([joined/2, members_changed/3, handle_msg/3]).
-behaviour(gen_server2).
-behaviour(gm).
@@ -348,11 +348,11 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) ->
handle_call(get_gm, _From, State = #state { gm = GM }) ->
reply(GM, State).
-handle_cast({gm_deaths, LiveGMPids},
+handle_cast({gm_deaths, DeadGMPids},
State = #state { q = #amqqueue { name = QueueName, pid = MPid } })
when node(MPid) =:= node() ->
case rabbit_mirror_queue_misc:remove_from_queue(
- QueueName, MPid, LiveGMPids) of
+ QueueName, MPid, DeadGMPids) of
{ok, MPid, DeadPids} ->
rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
DeadPids),
@@ -401,10 +401,10 @@ joined([CPid], Members) ->
CPid ! {joined, self(), Members},
ok.
-members_changed([_CPid], _Births, [], _Live) ->
+members_changed([_CPid], _Births, []) ->
ok;
-members_changed([CPid], _Births, _Deaths, Live) ->
- ok = gen_server2:cast(CPid, {gm_deaths, Live}).
+members_changed([CPid], _Births, Deaths) ->
+ ok = gen_server2:cast(CPid, {gm_deaths, Deaths}).
handle_msg([CPid], _From, request_depth = Msg) ->
ok = gen_server2:cast(CPid, Msg);
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 4bb923c4..2b16b911 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -183,7 +183,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ,
stop_all_slaves(Reason, #state{name = QName, gm = GM}) ->
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
- MRefs = [erlang:monitor(process, SPid) || SPid <- SPids],
+ MRefs = [erlang:monitor(process, Pid) || Pid <- [GM | SPids]],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
[receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs],
%% Normally when we remove a slave another slave or master will
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index a2f4eec5..256543de 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -65,15 +65,8 @@
%%----------------------------------------------------------------------------
-%% If the dead pids include the queue pid (i.e. the master has died)
-%% then only remove that if we are about to be promoted. Otherwise we
-%% can have the situation where a slave updates the mnesia record for
-%% a queue, promoting another slave before that slave realises it has
-%% become the new master, which is bad because it could then mean the
-%% slave (now master) receives messages it's not ready for (for
-%% example, new consumers).
%% Returns {ok, NewMPid, DeadPids}
-remove_from_queue(QueueName, Self, LiveGMPids) ->
+remove_from_queue(QueueName, Self, DeadGMPids) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
%% Someone else could have deleted the queue before we
@@ -83,40 +76,63 @@ remove_from_queue(QueueName, Self, LiveGMPids) ->
[Q = #amqqueue { pid = QPid,
slave_pids = SPids,
gm_pids = GMPids }] ->
- {GMPids1, Dead} = lists:partition(
- fun ({GM, _}) ->
- lists:member(GM, LiveGMPids)
- end, GMPids),
- DeadPids = [Pid || {_GM, Pid} <- Dead],
- Alive = [QPid | SPids] -- DeadPids,
+ {DeadGM, AliveGM} = lists:partition(
+ fun ({GM, _}) ->
+ lists:member(GM, DeadGMPids)
+ end, GMPids),
+ DeadPids = [Pid || {_GM, Pid} <- DeadGM],
+ AlivePids = [Pid || {_GM, Pid} <- AliveGM],
+ Alive = [Pid || Pid <- [QPid | SPids],
+ lists:member(Pid, AlivePids)],
{QPid1, SPids1} = promote_slave(Alive),
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
- GMPids = GMPids1, %% ASSERTION
- {ok, QPid1, []};
+ ok;
_ when QPid =:= QPid1 orelse QPid1 =:= Self ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
Q1 = Q#amqqueue{pid = QPid1,
slave_pids = SPids1,
- gm_pids = GMPids1},
+ gm_pids = AliveGM},
store_updated_slaves(Q1),
%% If we add and remove nodes at the same time we
%% might tell the old master we need to sync and
%% then shut it down. So let's check if the new
%% master needs to sync.
- maybe_auto_sync(Q1),
- {ok, QPid1, [QPid | SPids] -- Alive};
+ maybe_auto_sync(Q1);
_ ->
- %% Master has changed, and we're not it,
- %% so leave alone to allow the promoted
- %% slave to find it and make its
- %% promotion atomic.
- {ok, QPid1, []}
- end
+ %% Master has changed, and we're not it.
+ %% [1].
+ Q1 = Q#amqqueue{slave_pids = Alive,
+ gm_pids = AliveGM},
+ store_updated_slaves(Q1)
+ end,
+ {ok, QPid1, DeadPids}
end
end).
+%% [1] We still update mnesia here in case the slave that is supposed
+%% to become master dies before it does do so, in which case the dead
+%% old master might otherwise never get removed, which in turn might
+%% prevent promotion of another slave (e.g. us).
+%%
+%% Note however that we do not update the master pid. Otherwise we can
+%% have the situation where a slave updates the mnesia record for a
+%% queue, promoting another slave before that slave realises it has
+%% become the new master, which is bad because it could then mean the
+%% slave (now master) receives messages it's not ready for (for
+%% example, new consumers).
+%%
+%% We set slave_pids to Alive rather than SPids1 since otherwise we'd
+%% be removing the pid of the candidate master, which in turn would
+%% prevent it from promoting itself.
+%%
+%% We maintain gm_pids as our source of truth, i.e. it contains the
+%% most up-to-date information about which GMs and associated
+%% {M,S}Pids are alive. And all pids in slave_pids always have a
+%% corresponding entry in gm_pids. By contrast, due to the
+%% aforementioned restriction on updating the master pid, that pid may
+%% not be present in gm_pids, but only if said master has died.
on_node_up() ->
QNames =
@@ -203,13 +219,13 @@ start_child(Name, MirrorNode, Q, SyncMode) ->
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
ok;
report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) ->
- log_info(QueueName, "~s ~s saw deaths of mirrors ~s~n",
+ log_info(QueueName, "~s ~s saw deaths of mirrors~s~n",
[case IsMaster of
true -> "Master";
false -> "Slave"
end,
rabbit_misc:pid_to_string(MirrorPid),
- [[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]).
+ [[$ , rabbit_misc:pid_to_string(P)] || P <- DeadPids]]).
log_info (QName, Fmt, Args) -> log(info, QName, Fmt, Args).
log_warning(QName, Fmt, Args) -> log(warning, QName, Fmt, Args).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index ee9f7701..f6acd91a 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -30,7 +30,7 @@
code_change/3, handle_pre_hibernate/1, prioritise_call/4,
prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
--export([joined/2, members_changed/4, handle_msg/3]).
+-export([joined/2, members_changed/3, handle_msg/3]).
-behaviour(gen_server2).
-behaviour(gm).
@@ -166,13 +166,13 @@ init_it(Self, GM, Node, QName) ->
end;
[SPid] -> case rabbit_misc:is_process_alive(SPid) of
true -> existing;
- false -> GMPids = [T || T = {_, S} <- GMPids,
- S =/= SPid],
+ false -> GMPids1 = [T || T = {_, S} <- GMPids,
+ S =/= SPid],
Q1 = Q#amqqueue{
slave_pids = SPids -- [SPid],
- gm_pids = GMPids},
+ gm_pids = GMPids1},
add_slave(Q1, Self, GM),
- {new, QPid, GMPids}
+ {new, QPid, GMPids1}
end
end;
[] ->
@@ -191,11 +191,11 @@ handle_call(go, _From, {not_started, Q} = NotStarted) ->
{error, Error} -> {stop, Error, NotStarted}
end;
-handle_call({gm_deaths, LiveGMPids}, From,
+handle_call({gm_deaths, DeadGMPids}, From,
State = #state { gm = GM, q = Q = #amqqueue {
name = QName, pid = MPid }}) ->
Self = self(),
- case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, LiveGMPids) of
+ case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
@@ -353,6 +353,9 @@ terminate_common(State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+handle_pre_hibernate({not_started, _Q} = State) ->
+ {hibernate, State};
+
handle_pre_hibernate(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
{RamDuration, BQS1} = BQ:ram_duration(BQS),
@@ -365,7 +368,7 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
info -> 9;
- {gm_deaths, _Live} -> 5;
+ {gm_deaths, _Dead} -> 5;
_ -> 0
end.
@@ -393,10 +396,17 @@ format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
joined([SPid], _Members) -> SPid ! {joined, self()}, ok.
-members_changed([_SPid], _Births, [], _Live) ->
+members_changed([_SPid], _Births, []) ->
ok;
-members_changed([ SPid], _Births, _Deaths, Live) ->
- inform_deaths(SPid, Live).
+members_changed([ SPid], _Births, Deaths) ->
+ case rabbit_misc:with_exit_handler(
+ rabbit_misc:const(ok),
+ fun() ->
+ gen_server2:call(SPid, {gm_deaths, Deaths}, infinity)
+ end) of
+ ok -> ok;
+ {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]}
+ end.
handle_msg([_SPid], _From, request_depth) ->
%% This is only of value to the master
@@ -421,14 +431,6 @@ handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) ->
handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
-inform_deaths(SPid, Live) ->
- case rabbit_misc:with_exit_handler(
- rabbit_misc:const(ok),
- fun() -> gen_server2:call(SPid, {gm_deaths, Live}, infinity) end) of
- ok -> ok;
- {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]}
- end.
-
%% ---------------------------------------------------------------------------
%% Others
%% ---------------------------------------------------------------------------
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 53394155..d8ceeceb 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -179,7 +179,8 @@ server_capabilities(rabbit_framing_amqp_0_9_1) ->
{<<"consumer_cancel_notify">>, bool, true},
{<<"connection.blocked">>, bool, true},
{<<"consumer_priorities">>, bool, true},
- {<<"authentication_failure_close">>, bool, true}];
+ {<<"authentication_failure_close">>, bool, true},
+ {<<"per_consumer_qos">>, bool, true}];
server_capabilities(_) ->
[].
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 0f265e22..b1dba5a2 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -28,7 +28,7 @@
-behaviour(gen_server2).
--export([start_link/0, submit/1, submit_async/1, idle/1]).
+-export([start_link/0, submit/1, submit_async/1, ready/1, idle/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -42,7 +42,8 @@
-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A).
-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok').
--spec(idle/1 :: (any()) -> 'ok').
+-spec(ready/1 :: (pid()) -> 'ok').
+-spec(idle/1 :: (pid()) -> 'ok').
-endif.
@@ -56,9 +57,8 @@
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server2:start_link({local, ?SERVER}, ?MODULE, [],
- [{timeout, infinity}]).
+start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [],
+ [{timeout, infinity}]).
submit(Fun) ->
case get(worker_pool_worker) of
@@ -67,64 +67,65 @@ submit(Fun) ->
worker_pool_worker:submit(Pid, Fun)
end.
-submit_async(Fun) ->
- gen_server2:cast(?SERVER, {run_async, Fun}).
+submit_async(Fun) -> gen_server2:cast(?SERVER, {run_async, Fun}).
-idle(WId) ->
- gen_server2:cast(?SERVER, {idle, WId}).
+ready(WPid) -> gen_server2:cast(?SERVER, {ready, WPid}).
+
+idle(WPid) -> gen_server2:cast(?SERVER, {idle, WPid}).
%%----------------------------------------------------------------------------
init([]) ->
- {ok, #state { pending = queue:new(), available = queue:new() }, hibernate,
+ {ok, #state { pending = queue:new(), available = ordsets:new() }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({next_free, CPid}, From, State = #state { available = Avail,
- pending = Pending }) ->
- case queue:out(Avail) of
- {empty, _Avail} ->
- {noreply,
- State#state{pending = queue:in({next_free, From, CPid}, Pending)},
- hibernate};
- {{value, WId}, Avail1} ->
- WPid = get_worker_pid(WId),
- worker_pool_worker:next_job_from(WPid, CPid),
- {reply, WPid, State #state { available = Avail1 },
- hibernate}
- end;
+handle_call({next_free, CPid}, From, State = #state { available = [],
+ pending = Pending }) ->
+ {noreply, State#state{pending = queue:in({next_free, From, CPid}, Pending)},
+ hibernate};
+handle_call({next_free, CPid}, _From, State = #state { available =
+ [WPid | Avail1] }) ->
+ worker_pool_worker:next_job_from(WPid, CPid),
+ {reply, WPid, State #state { available = Avail1 }, hibernate};
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
-handle_cast({idle, WId}, State = #state { available = Avail,
- pending = Pending }) ->
- {noreply, case queue:out(Pending) of
- {empty, _Pending} ->
- State #state { available = queue:in(WId, Avail) };
- {{value, {next_free, From, CPid}}, Pending1} ->
- WPid = get_worker_pid(WId),
- worker_pool_worker:next_job_from(WPid, CPid),
- gen_server2:reply(From, WPid),
- State #state { pending = Pending1 };
- {{value, {run_async, Fun}}, Pending1} ->
- worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
- State #state { pending = Pending1 }
- end, hibernate};
-
-handle_cast({run_async, Fun}, State = #state { available = Avail,
- pending = Pending }) ->
+handle_cast({ready, WPid}, State) ->
+ erlang:monitor(process, WPid),
+ handle_cast({idle, WPid}, State);
+
+handle_cast({idle, WPid}, State = #state { available = Avail,
+ pending = Pending }) ->
{noreply,
- case queue:out(Avail) of
- {empty, _Avail} ->
- State #state { pending = queue:in({run_async, Fun}, Pending)};
- {{value, WId}, Avail1} ->
- worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
- State #state { available = Avail1 }
+ case queue:out(Pending) of
+ {empty, _Pending} ->
+ State #state { available = ordsets:add_element(WPid, Avail) };
+ {{value, {next_free, From, CPid}}, Pending1} ->
+ worker_pool_worker:next_job_from(WPid, CPid),
+ gen_server2:reply(From, WPid),
+ State #state { pending = Pending1 };
+ {{value, {run_async, Fun}}, Pending1} ->
+ worker_pool_worker:submit_async(WPid, Fun),
+ State #state { pending = Pending1 }
end, hibernate};
+handle_cast({run_async, Fun}, State = #state { available = [],
+ pending = Pending }) ->
+ {noreply, State #state { pending = queue:in({run_async, Fun}, Pending)},
+ hibernate};
+handle_cast({run_async, Fun}, State = #state { available = [WPid | Avail1] }) ->
+ worker_pool_worker:submit_async(WPid, Fun),
+ {noreply, State #state { available = Avail1 }, hibernate};
+
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
+handle_info({'DOWN', _MRef, process, WPid, _Reason},
+ State = #state { available = Avail }) ->
+ {noreply, State #state { available = ordsets:del_element(WPid, Avail) },
+ hibernate};
+
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@@ -133,14 +134,3 @@ code_change(_OldVsn, State, _Extra) ->
terminate(_Reason, State) ->
State.
-
-%%----------------------------------------------------------------------------
-
-get_worker_pid(WId) ->
- [{WId, Pid, _Type, _Modules} | _] =
- lists:dropwhile(fun ({Id, _Pid, _Type, _Modules})
- when Id =:= WId -> false;
- (_) -> true
- end,
- supervisor:which_children(worker_pool_sup)),
- Pid.
diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl
index 16c359a0..89d2ed46 100644
--- a/src/worker_pool_sup.erl
+++ b/src/worker_pool_sup.erl
@@ -49,5 +49,5 @@ init([WCount]) ->
{ok, {{one_for_one, 10, 10},
[{worker_pool, {worker_pool, start_link, []}, transient,
16#ffffffff, worker, [worker_pool]} |
- [{N, {worker_pool_worker, start_link, [N]}, transient, 16#ffffffff,
+ [{N, {worker_pool_worker, start_link, []}, transient, 16#ffffffff,
worker, [worker_pool_worker]} || N <- lists:seq(1, WCount)]]}}.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 43673cb2..beb95bc6 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/1, next_job_from/2, submit/2, submit_async/2, run/1]).
+-export([start_link/0, next_job_from/2, submit/2, submit_async/2, run/1]).
-export([set_maximum_since_use/2]).
@@ -31,7 +31,7 @@
-type(mfargs() :: {atom(), atom(), [any()]}).
--spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
-spec(next_job_from/2 :: (pid(), pid()) -> 'ok').
-spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A).
-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok').
@@ -45,12 +45,10 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
--record(state, {id, next}).
-
%%----------------------------------------------------------------------------
-start_link(WId) ->
- gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]).
+start_link() ->
+ gen_server2:start_link(?MODULE, [], [{timeout, infinity}]).
next_job_from(Pid, CPid) ->
gen_server2:cast(Pid, {next_job_from, CPid}).
@@ -71,45 +69,43 @@ run(Fun) ->
%%----------------------------------------------------------------------------
-init([WId]) ->
+init([]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
- ok = worker_pool:idle(WId),
+ ok = worker_pool:ready(self()),
put(worker_pool_worker, true),
- {ok, #state{id = WId}, hibernate,
+ {ok, undefined, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7;
prioritise_cast(_Msg, _Len, _State) -> 0.
-handle_call({submit, Fun, CPid}, From, State = #state{next = undefined}) ->
- {noreply, State#state{next = {job, CPid, From, Fun}}, hibernate};
+handle_call({submit, Fun, CPid}, From, undefined) ->
+ {noreply, {job, CPid, From, Fun}, hibernate};
-handle_call({submit, Fun, CPid}, From, State = #state{next = {from, CPid, MRef},
- id = WId}) ->
+handle_call({submit, Fun, CPid}, From, {from, CPid, MRef}) ->
erlang:demonitor(MRef),
gen_server2:reply(From, run(Fun)),
- ok = worker_pool:idle(WId),
- {noreply, State#state{next = undefined}, hibernate};
+ ok = worker_pool:idle(self()),
+ {noreply, undefined, hibernate};
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
-handle_cast({next_job_from, CPid}, State = #state{next = undefined}) ->
+handle_cast({next_job_from, CPid}, undefined) ->
MRef = erlang:monitor(process, CPid),
- {noreply, State#state{next = {from, CPid, MRef}}, hibernate};
+ {noreply, {from, CPid, MRef}, hibernate};
-handle_cast({next_job_from, CPid}, State = #state{next = {job, CPid, From, Fun},
- id = WId}) ->
+handle_cast({next_job_from, CPid}, {job, CPid, From, Fun}) ->
gen_server2:reply(From, run(Fun)),
- ok = worker_pool:idle(WId),
- {noreply, State#state{next = undefined}, hibernate};
+ ok = worker_pool:idle(self()),
+ {noreply, undefined, hibernate};
-handle_cast({submit_async, Fun}, State = #state{id = WId}) ->
+handle_cast({submit_async, Fun}, undefined) ->
run(Fun),
- ok = worker_pool:idle(WId),
- {noreply, State, hibernate};
+ ok = worker_pool:idle(self()),
+ {noreply, undefined, hibernate};
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -118,14 +114,12 @@ handle_cast({set_maximum_since_use, Age}, State) ->
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
-handle_info({'DOWN', MRef, process, CPid, _Reason},
- State = #state{id = WId,
- next = {from, CPid, MRef}}) ->
- ok = worker_pool:idle(WId),
- {noreply, State#state{next = undefined}};
+handle_info({'DOWN', MRef, process, CPid, _Reason}, {from, CPid, MRef}) ->
+ ok = worker_pool:idle(self()),
+ {noreply, undefined, hibernate};
handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) ->
- {noreply, State};
+ {noreply, State, hibernate};
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.