diff options
-rw-r--r-- | Makefile | 5 | ||||
-rw-r--r-- | include/gm_specs.hrl | 3 | ||||
-rw-r--r-- | src/gm.erl | 43 | ||||
-rw-r--r-- | src/gm_soak_test.erl | 4 | ||||
-rw-r--r-- | src/gm_speed_test.erl | 4 | ||||
-rw-r--r-- | src/gm_tests.erl | 4 | ||||
-rw-r--r-- | src/mirrored_supervisor.erl | 8 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 24 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 38 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 12 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 70 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 40 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 3 | ||||
-rw-r--r-- | src/worker_pool.erl | 104 | ||||
-rw-r--r-- | src/worker_pool_sup.erl | 2 | ||||
-rw-r--r-- | src/worker_pool_worker.erl | 54 |
17 files changed, 200 insertions, 220 deletions
@@ -194,6 +194,11 @@ run: all RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \ ./scripts/rabbitmq-server +run-background: all + $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ + RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \ + ./scripts/rabbitmq-server -detached + run-node: all $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ RABBITMQ_NODE_ONLY=true \ diff --git a/include/gm_specs.hrl b/include/gm_specs.hrl index f4ea0df8..245c23bc 100644 --- a/include/gm_specs.hrl +++ b/include/gm_specs.hrl @@ -21,8 +21,7 @@ -type(members() :: [pid()]). -spec(joined/2 :: (args(), members()) -> callback_result()). --spec(members_changed/4 :: (args(), members(), - members(), members()) -> callback_result()). +-spec(members_changed/3 :: (args(), members(), members()) -> callback_result()). -spec(handle_msg/3 :: (args(), pid(), any()) -> callback_result()). -spec(terminate/2 :: (args(), term()) -> any()). @@ -476,8 +476,8 @@ %% joined/2 before receiving any messages from it; and (2) we will not %% see members die that we have not seen born (or supplied in the %% members to joined/2). --callback members_changed(Args :: term(), Births :: [pid()], - Deaths :: [pid()], Live :: [pid()]) -> +-callback members_changed(Args :: term(), + Births :: [pid()], Deaths :: [pid()]) -> ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}. %% Supplied with Args provided in start_link, the sender, and the @@ -496,7 +496,7 @@ -else. behaviour_info(callbacks) -> - [{joined, 2}, {members_changed, 4}, {handle_msg, 3}, {terminate, 2}]; + [{joined, 2}, {members_changed, 3}, {handle_msg, 3}, {terminate, 2}]; behaviour_info(_Other) -> undefined. @@ -581,7 +581,11 @@ handle_call({confirmed_broadcast, Msg}, _From, ok, State}); handle_call({confirmed_broadcast, Msg}, From, State) -> - internal_broadcast(Msg, From, 0, State); + {Result, State1 = #state { pub_count = PubCount, confirms = Confirms }} = + internal_broadcast(Msg, 0, State), + Confirms1 = queue:in({PubCount, From}, Confirms), + handle_callback_result({Result, flush_broadcast_buffer( + State1 #state { confirms = Confirms1 })}); handle_call(info, _From, State = #state { members_state = undefined }) -> @@ -657,7 +661,8 @@ handle_cast({broadcast, Msg, _SizeHint}, State}); handle_cast({broadcast, Msg, SizeHint}, State) -> - internal_broadcast(Msg, none, SizeHint, State); + {Result, State1} = internal_broadcast(Msg, SizeHint, State), + handle_callback_result({Result, maybe_flush_broadcast_buffer(State1)}); handle_cast(join, State = #state { self = Self, group_name = GroupName, @@ -685,8 +690,7 @@ handle_cast({validate_members, OldMembers}, Deaths = OldMembers -- NewMembers, case {Births, Deaths} of {[], []} -> noreply(State); - _ -> Result = Module:members_changed( - Args, Births, Deaths, NewMembers), + _ -> Result = Module:members_changed(Args, Births, Deaths), handle_callback_result({Result, State}) end; @@ -877,30 +881,18 @@ ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) -> ensure_broadcast_timer(State) -> State. -internal_broadcast(Msg, From, SizeHint, +internal_broadcast(Msg, SizeHint, State = #state { self = Self, pub_count = PubCount, module = Module, - confirms = Confirms, callback_args = Args, broadcast_buffer = Buffer, broadcast_buffer_sz = BufferSize }) -> PubCount1 = PubCount + 1, - Result = Module:handle_msg(Args, get_pid(Self), Msg), - Buffer1 = [{PubCount1, Msg} | Buffer], - Confirms1 = case From of - none -> Confirms; - _ -> queue:in({PubCount1, From}, Confirms) - end, - State1 = State #state { pub_count = PubCount1, - confirms = Confirms1, - broadcast_buffer = Buffer1, - broadcast_buffer_sz = BufferSize + SizeHint}, - handle_callback_result( - {Result, case From of - none -> maybe_flush_broadcast_buffer(State1); - _ -> flush_broadcast_buffer(State1) - end}). + {Module:handle_msg(Args, get_pid(Self), Msg), + State #state { pub_count = PubCount1, + broadcast_buffer = [{PubCount1, Msg} | Buffer], + broadcast_buffer_sz = BufferSize + SizeHint}}. %% The Erlang distribution mechanism has an interesting quirk - it %% will kill the VM cold with "Absurdly large distribution output data @@ -1399,8 +1391,7 @@ callback_view_changed(Args, Module, OldView, NewView) -> case {Births, Deaths} of {[], []} -> ok; _ -> Module:members_changed( - Args, get_pids(Births), get_pids(Deaths), - get_pids(NewMembers)) + Args, get_pids(Births), get_pids(Deaths)) end. handle_callback_result({Result, State}) -> diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl index 4ff1645a..c9a25522 100644 --- a/src/gm_soak_test.erl +++ b/src/gm_soak_test.erl @@ -17,7 +17,7 @@ -module(gm_soak_test). -export([test/0]). --export([joined/2, members_changed/4, handle_msg/3, terminate/2]). +-export([joined/2, members_changed/3, handle_msg/3, terminate/2]). -behaviour(gm). @@ -51,7 +51,7 @@ joined([], Members) -> put(ts, now()), ok. -members_changed([], Births, Deaths, _Live) -> +members_changed([], Births, Deaths) -> with_state( fun (State) -> State1 = diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl index fa515fa8..41be6dd8 100644 --- a/src/gm_speed_test.erl +++ b/src/gm_speed_test.erl @@ -17,7 +17,7 @@ -module(gm_speed_test). -export([test/3]). --export([joined/2, members_changed/4, handle_msg/3, terminate/2]). +-export([joined/2, members_changed/3, handle_msg/3, terminate/2]). -export([wile_e_coyote/2]). -behaviour(gm). @@ -30,7 +30,7 @@ joined(Owner, _Members) -> Owner ! joined, ok. -members_changed(_Owner, _Births, _Deaths, _Live) -> +members_changed(_Owner, _Births, _Deaths) -> ok. handle_msg(Owner, _From, ping) -> diff --git a/src/gm_tests.erl b/src/gm_tests.erl index 23b8f8cb..cae2164b 100644 --- a/src/gm_tests.erl +++ b/src/gm_tests.erl @@ -22,7 +22,7 @@ test_member_death/0, test_receive_in_order/0, all_tests/0]). --export([joined/2, members_changed/4, handle_msg/3, terminate/2]). +-export([joined/2, members_changed/3, handle_msg/3, terminate/2]). -behaviour(gm). @@ -40,7 +40,7 @@ joined(Pid, Members) -> Pid ! {joined, self(), Members}, ok. -members_changed(Pid, Births, Deaths, _Live) -> +members_changed(Pid, Births, Deaths) -> Pid ! {members_changed, self(), Births, Deaths}, ok. diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 7a352451..1ed6d710 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -446,10 +446,10 @@ supervisor(Pid) -> with_exit_handler(fun() -> dead end, fun() -> delegate(Pid) end). write(Group, Overall, ChildSpec) -> - ok = mnesia:write( - #mirrored_sup_childspec{key = {Group, id(ChildSpec)}, - mirroring_pid = Overall, - childspec = ChildSpec}), + S = #mirrored_sup_childspec{key = {Group, id(ChildSpec)}, + mirroring_pid = Overall, + childspec = ChildSpec}, + ok = mnesia:write(?TABLE, S, write), ChildSpec. delete(Group, Id) -> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d38f8191..1aba7ecb 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -45,8 +45,6 @@ -define(MORE_CONSUMER_CREDIT_AFTER, 50). --define(FAILOVER_WAIT_MILLIS, 100). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -517,26 +515,10 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). -%% We need to account for the idea that queues may be mid-promotion -%% during force_event_refresh (since it's likely we're doing this in -%% the first place since a node failed). Therefore we keep poking at -%% the list of queues until we were able to talk to a live process or -%% the queue no longer exists. force_event_refresh(Ref) -> - force_event_refresh([Q#amqqueue.name || Q <- list()], Ref). - -force_event_refresh(QNames, Ref) -> - Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)], - {_, Bad} = gen_server2:mcall( - [{Q#amqqueue.pid, {force_event_refresh, Ref}} || Q <- Qs]), - FailedPids = [Pid || {Pid, _Reason} <- Bad], - Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs, - lists:member(Pid, FailedPids)], - case Failed of - [] -> ok; - _ -> timer:sleep(?FAILOVER_WAIT_MILLIS), - force_event_refresh(Failed, Ref) - end. + [gen_server2:cast(Q#amqqueue.pid, + {force_event_refresh, Ref}) || Q <- list()], + ok. notify_policy_changed(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, policy_changed). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5d3f3a12..9b785303 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1062,25 +1062,7 @@ handle_call(sync_mirrors, _From, State) -> %% By definition if we get this message here we do not have to do anything. handle_call(cancel_sync_mirrors, _From, State) -> - reply({ok, not_syncing}, State); - -handle_call({force_event_refresh, Ref}, _From, - State = #q{consumers = Consumers, - exclusive_consumer = Exclusive}) -> - rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State), Ref), - QName = qname(State), - AllConsumers = rabbit_queue_consumers:all(Consumers), - case Exclusive of - none -> [emit_consumer_created( - Ch, CTag, false, AckRequired, QName, Prefetch, - Args, Ref) || - {Ch, CTag, AckRequired, Prefetch, Args} - <- AllConsumers]; - {Ch, CTag} -> [{Ch, CTag, AckRequired, Prefetch, Args}] = AllConsumers, - emit_consumer_created( - Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref) - end, - reply(ok, State). + reply({ok, not_syncing}, State). handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -1167,6 +1149,24 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, run_message_queue(true, State1) end); +handle_cast({force_event_refresh, Ref}, + State = #q{consumers = Consumers, + exclusive_consumer = Exclusive}) -> + rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State), Ref), + QName = qname(State), + AllConsumers = rabbit_queue_consumers:all(Consumers), + case Exclusive of + none -> [emit_consumer_created( + Ch, CTag, false, AckRequired, QName, Prefetch, + Args, Ref) || + {Ch, CTag, AckRequired, Prefetch, Args} + <- AllConsumers]; + {Ch, CTag} -> [{Ch, CTag, AckRequired, Prefetch, Args}] = AllConsumers, + emit_consumer_created( + Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref) + end, + noreply(State); + handle_cast(notify_decorators, State) -> notify_decorators(State), noreply(State); diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 2feeea5a..23718da1 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -21,7 +21,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([joined/2, members_changed/4, handle_msg/3]). +-export([joined/2, members_changed/3, handle_msg/3]). -behaviour(gen_server2). -behaviour(gm). @@ -348,11 +348,11 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) -> handle_call(get_gm, _From, State = #state { gm = GM }) -> reply(GM, State). -handle_cast({gm_deaths, LiveGMPids}, +handle_cast({gm_deaths, DeadGMPids}, State = #state { q = #amqqueue { name = QueueName, pid = MPid } }) when node(MPid) =:= node() -> case rabbit_mirror_queue_misc:remove_from_queue( - QueueName, MPid, LiveGMPids) of + QueueName, MPid, DeadGMPids) of {ok, MPid, DeadPids} -> rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName, DeadPids), @@ -401,10 +401,10 @@ joined([CPid], Members) -> CPid ! {joined, self(), Members}, ok. -members_changed([_CPid], _Births, [], _Live) -> +members_changed([_CPid], _Births, []) -> ok; -members_changed([CPid], _Births, _Deaths, Live) -> - ok = gen_server2:cast(CPid, {gm_deaths, Live}). +members_changed([CPid], _Births, Deaths) -> + ok = gen_server2:cast(CPid, {gm_deaths, Deaths}). handle_msg([CPid], _From, request_depth = Msg) -> ok = gen_server2:cast(CPid, Msg); diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 4bb923c4..2b16b911 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -183,7 +183,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ, stop_all_slaves(Reason, #state{name = QName, gm = GM}) -> {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), - MRefs = [erlang:monitor(process, SPid) || SPid <- SPids], + MRefs = [erlang:monitor(process, Pid) || Pid <- [GM | SPids]], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), [receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs], %% Normally when we remove a slave another slave or master will diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index a2f4eec5..256543de 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -65,15 +65,8 @@ %%---------------------------------------------------------------------------- -%% If the dead pids include the queue pid (i.e. the master has died) -%% then only remove that if we are about to be promoted. Otherwise we -%% can have the situation where a slave updates the mnesia record for -%% a queue, promoting another slave before that slave realises it has -%% become the new master, which is bad because it could then mean the -%% slave (now master) receives messages it's not ready for (for -%% example, new consumers). %% Returns {ok, NewMPid, DeadPids} -remove_from_queue(QueueName, Self, LiveGMPids) -> +remove_from_queue(QueueName, Self, DeadGMPids) -> rabbit_misc:execute_mnesia_transaction( fun () -> %% Someone else could have deleted the queue before we @@ -83,40 +76,63 @@ remove_from_queue(QueueName, Self, LiveGMPids) -> [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids }] -> - {GMPids1, Dead} = lists:partition( - fun ({GM, _}) -> - lists:member(GM, LiveGMPids) - end, GMPids), - DeadPids = [Pid || {_GM, Pid} <- Dead], - Alive = [QPid | SPids] -- DeadPids, + {DeadGM, AliveGM} = lists:partition( + fun ({GM, _}) -> + lists:member(GM, DeadGMPids) + end, GMPids), + DeadPids = [Pid || {_GM, Pid} <- DeadGM], + AlivePids = [Pid || {_GM, Pid} <- AliveGM], + Alive = [Pid || Pid <- [QPid | SPids], + lists:member(Pid, AlivePids)], {QPid1, SPids1} = promote_slave(Alive), case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> - GMPids = GMPids1, %% ASSERTION - {ok, QPid1, []}; + ok; _ when QPid =:= QPid1 orelse QPid1 =:= Self -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. Q1 = Q#amqqueue{pid = QPid1, slave_pids = SPids1, - gm_pids = GMPids1}, + gm_pids = AliveGM}, store_updated_slaves(Q1), %% If we add and remove nodes at the same time we %% might tell the old master we need to sync and %% then shut it down. So let's check if the new %% master needs to sync. - maybe_auto_sync(Q1), - {ok, QPid1, [QPid | SPids] -- Alive}; + maybe_auto_sync(Q1); _ -> - %% Master has changed, and we're not it, - %% so leave alone to allow the promoted - %% slave to find it and make its - %% promotion atomic. - {ok, QPid1, []} - end + %% Master has changed, and we're not it. + %% [1]. + Q1 = Q#amqqueue{slave_pids = Alive, + gm_pids = AliveGM}, + store_updated_slaves(Q1) + end, + {ok, QPid1, DeadPids} end end). +%% [1] We still update mnesia here in case the slave that is supposed +%% to become master dies before it does do so, in which case the dead +%% old master might otherwise never get removed, which in turn might +%% prevent promotion of another slave (e.g. us). +%% +%% Note however that we do not update the master pid. Otherwise we can +%% have the situation where a slave updates the mnesia record for a +%% queue, promoting another slave before that slave realises it has +%% become the new master, which is bad because it could then mean the +%% slave (now master) receives messages it's not ready for (for +%% example, new consumers). +%% +%% We set slave_pids to Alive rather than SPids1 since otherwise we'd +%% be removing the pid of the candidate master, which in turn would +%% prevent it from promoting itself. +%% +%% We maintain gm_pids as our source of truth, i.e. it contains the +%% most up-to-date information about which GMs and associated +%% {M,S}Pids are alive. And all pids in slave_pids always have a +%% corresponding entry in gm_pids. By contrast, due to the +%% aforementioned restriction on updating the master pid, that pid may +%% not be present in gm_pids, but only if said master has died. on_node_up() -> QNames = @@ -203,13 +219,13 @@ start_child(Name, MirrorNode, Q, SyncMode) -> report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> ok; report_deaths(MirrorPid, IsMaster, QueueName, DeadPids) -> - log_info(QueueName, "~s ~s saw deaths of mirrors ~s~n", + log_info(QueueName, "~s ~s saw deaths of mirrors~s~n", [case IsMaster of true -> "Master"; false -> "Slave" end, rabbit_misc:pid_to_string(MirrorPid), - [[rabbit_misc:pid_to_string(P), $ ] || P <- DeadPids]]). + [[$ , rabbit_misc:pid_to_string(P)] || P <- DeadPids]]). log_info (QName, Fmt, Args) -> log(info, QName, Fmt, Args). log_warning(QName, Fmt, Args) -> log(warning, QName, Fmt, Args). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index ee9f7701..f6acd91a 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -30,7 +30,7 @@ code_change/3, handle_pre_hibernate/1, prioritise_call/4, prioritise_cast/3, prioritise_info/3, format_message_queue/2]). --export([joined/2, members_changed/4, handle_msg/3]). +-export([joined/2, members_changed/3, handle_msg/3]). -behaviour(gen_server2). -behaviour(gm). @@ -166,13 +166,13 @@ init_it(Self, GM, Node, QName) -> end; [SPid] -> case rabbit_misc:is_process_alive(SPid) of true -> existing; - false -> GMPids = [T || T = {_, S} <- GMPids, - S =/= SPid], + false -> GMPids1 = [T || T = {_, S} <- GMPids, + S =/= SPid], Q1 = Q#amqqueue{ slave_pids = SPids -- [SPid], - gm_pids = GMPids}, + gm_pids = GMPids1}, add_slave(Q1, Self, GM), - {new, QPid, GMPids} + {new, QPid, GMPids1} end end; [] -> @@ -191,11 +191,11 @@ handle_call(go, _From, {not_started, Q} = NotStarted) -> {error, Error} -> {stop, Error, NotStarted} end; -handle_call({gm_deaths, LiveGMPids}, From, +handle_call({gm_deaths, DeadGMPids}, From, State = #state { gm = GM, q = Q = #amqqueue { name = QName, pid = MPid }}) -> Self = self(), - case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, LiveGMPids) of + case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; @@ -353,6 +353,9 @@ terminate_common(State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +handle_pre_hibernate({not_started, _Q} = State) -> + {hibernate, State}; + handle_pre_hibernate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> {RamDuration, BQS1} = BQ:ram_duration(BQS), @@ -365,7 +368,7 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, prioritise_call(Msg, _From, _Len, _State) -> case Msg of info -> 9; - {gm_deaths, _Live} -> 5; + {gm_deaths, _Dead} -> 5; _ -> 0 end. @@ -393,10 +396,17 @@ format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). joined([SPid], _Members) -> SPid ! {joined, self()}, ok. -members_changed([_SPid], _Births, [], _Live) -> +members_changed([_SPid], _Births, []) -> ok; -members_changed([ SPid], _Births, _Deaths, Live) -> - inform_deaths(SPid, Live). +members_changed([ SPid], _Births, Deaths) -> + case rabbit_misc:with_exit_handler( + rabbit_misc:const(ok), + fun() -> + gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) + end) of + ok -> ok; + {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]} + end. handle_msg([_SPid], _From, request_depth) -> %% This is only of value to the master @@ -421,14 +431,6 @@ handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) -> handle_msg([SPid], _From, Msg) -> ok = gen_server2:cast(SPid, {gm, Msg}). -inform_deaths(SPid, Live) -> - case rabbit_misc:with_exit_handler( - rabbit_misc:const(ok), - fun() -> gen_server2:call(SPid, {gm_deaths, Live}, infinity) end) of - ok -> ok; - {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]} - end. - %% --------------------------------------------------------------------------- %% Others %% --------------------------------------------------------------------------- diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 53394155..d8ceeceb 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -179,7 +179,8 @@ server_capabilities(rabbit_framing_amqp_0_9_1) -> {<<"consumer_cancel_notify">>, bool, true}, {<<"connection.blocked">>, bool, true}, {<<"consumer_priorities">>, bool, true}, - {<<"authentication_failure_close">>, bool, true}]; + {<<"authentication_failure_close">>, bool, true}, + {<<"per_consumer_qos">>, bool, true}]; server_capabilities(_) -> []. diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 0f265e22..b1dba5a2 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -28,7 +28,7 @@ -behaviour(gen_server2). --export([start_link/0, submit/1, submit_async/1, idle/1]). +-export([start_link/0, submit/1, submit_async/1, ready/1, idle/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -42,7 +42,8 @@ -spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). -spec(submit/1 :: (fun (() -> A) | mfargs()) -> A). -spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok'). --spec(idle/1 :: (any()) -> 'ok'). +-spec(ready/1 :: (pid()) -> 'ok'). +-spec(idle/1 :: (pid()) -> 'ok'). -endif. @@ -56,9 +57,8 @@ %%---------------------------------------------------------------------------- -start_link() -> - gen_server2:start_link({local, ?SERVER}, ?MODULE, [], - [{timeout, infinity}]). +start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [], + [{timeout, infinity}]). submit(Fun) -> case get(worker_pool_worker) of @@ -67,64 +67,65 @@ submit(Fun) -> worker_pool_worker:submit(Pid, Fun) end. -submit_async(Fun) -> - gen_server2:cast(?SERVER, {run_async, Fun}). +submit_async(Fun) -> gen_server2:cast(?SERVER, {run_async, Fun}). -idle(WId) -> - gen_server2:cast(?SERVER, {idle, WId}). +ready(WPid) -> gen_server2:cast(?SERVER, {ready, WPid}). + +idle(WPid) -> gen_server2:cast(?SERVER, {idle, WPid}). %%---------------------------------------------------------------------------- init([]) -> - {ok, #state { pending = queue:new(), available = queue:new() }, hibernate, + {ok, #state { pending = queue:new(), available = ordsets:new() }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({next_free, CPid}, From, State = #state { available = Avail, - pending = Pending }) -> - case queue:out(Avail) of - {empty, _Avail} -> - {noreply, - State#state{pending = queue:in({next_free, From, CPid}, Pending)}, - hibernate}; - {{value, WId}, Avail1} -> - WPid = get_worker_pid(WId), - worker_pool_worker:next_job_from(WPid, CPid), - {reply, WPid, State #state { available = Avail1 }, - hibernate} - end; +handle_call({next_free, CPid}, From, State = #state { available = [], + pending = Pending }) -> + {noreply, State#state{pending = queue:in({next_free, From, CPid}, Pending)}, + hibernate}; +handle_call({next_free, CPid}, _From, State = #state { available = + [WPid | Avail1] }) -> + worker_pool_worker:next_job_from(WPid, CPid), + {reply, WPid, State #state { available = Avail1 }, hibernate}; handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. -handle_cast({idle, WId}, State = #state { available = Avail, - pending = Pending }) -> - {noreply, case queue:out(Pending) of - {empty, _Pending} -> - State #state { available = queue:in(WId, Avail) }; - {{value, {next_free, From, CPid}}, Pending1} -> - WPid = get_worker_pid(WId), - worker_pool_worker:next_job_from(WPid, CPid), - gen_server2:reply(From, WPid), - State #state { pending = Pending1 }; - {{value, {run_async, Fun}}, Pending1} -> - worker_pool_worker:submit_async(get_worker_pid(WId), Fun), - State #state { pending = Pending1 } - end, hibernate}; - -handle_cast({run_async, Fun}, State = #state { available = Avail, - pending = Pending }) -> +handle_cast({ready, WPid}, State) -> + erlang:monitor(process, WPid), + handle_cast({idle, WPid}, State); + +handle_cast({idle, WPid}, State = #state { available = Avail, + pending = Pending }) -> {noreply, - case queue:out(Avail) of - {empty, _Avail} -> - State #state { pending = queue:in({run_async, Fun}, Pending)}; - {{value, WId}, Avail1} -> - worker_pool_worker:submit_async(get_worker_pid(WId), Fun), - State #state { available = Avail1 } + case queue:out(Pending) of + {empty, _Pending} -> + State #state { available = ordsets:add_element(WPid, Avail) }; + {{value, {next_free, From, CPid}}, Pending1} -> + worker_pool_worker:next_job_from(WPid, CPid), + gen_server2:reply(From, WPid), + State #state { pending = Pending1 }; + {{value, {run_async, Fun}}, Pending1} -> + worker_pool_worker:submit_async(WPid, Fun), + State #state { pending = Pending1 } end, hibernate}; +handle_cast({run_async, Fun}, State = #state { available = [], + pending = Pending }) -> + {noreply, State #state { pending = queue:in({run_async, Fun}, Pending)}, + hibernate}; +handle_cast({run_async, Fun}, State = #state { available = [WPid | Avail1] }) -> + worker_pool_worker:submit_async(WPid, Fun), + {noreply, State #state { available = Avail1 }, hibernate}; + handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. +handle_info({'DOWN', _MRef, process, WPid, _Reason}, + State = #state { available = Avail }) -> + {noreply, State #state { available = ordsets:del_element(WPid, Avail) }, + hibernate}; + handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. @@ -133,14 +134,3 @@ code_change(_OldVsn, State, _Extra) -> terminate(_Reason, State) -> State. - -%%---------------------------------------------------------------------------- - -get_worker_pid(WId) -> - [{WId, Pid, _Type, _Modules} | _] = - lists:dropwhile(fun ({Id, _Pid, _Type, _Modules}) - when Id =:= WId -> false; - (_) -> true - end, - supervisor:which_children(worker_pool_sup)), - Pid. diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl index 16c359a0..89d2ed46 100644 --- a/src/worker_pool_sup.erl +++ b/src/worker_pool_sup.erl @@ -49,5 +49,5 @@ init([WCount]) -> {ok, {{one_for_one, 10, 10}, [{worker_pool, {worker_pool, start_link, []}, transient, 16#ffffffff, worker, [worker_pool]} | - [{N, {worker_pool_worker, start_link, [N]}, transient, 16#ffffffff, + [{N, {worker_pool_worker, start_link, []}, transient, 16#ffffffff, worker, [worker_pool_worker]} || N <- lists:seq(1, WCount)]]}}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 43673cb2..beb95bc6 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/1, next_job_from/2, submit/2, submit_async/2, run/1]). +-export([start_link/0, next_job_from/2, submit/2, submit_async/2, run/1]). -export([set_maximum_since_use/2]). @@ -31,7 +31,7 @@ -type(mfargs() :: {atom(), atom(), [any()]}). --spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}). +-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). -spec(next_job_from/2 :: (pid(), pid()) -> 'ok'). -spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A). -spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok'). @@ -45,12 +45,10 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). --record(state, {id, next}). - %%---------------------------------------------------------------------------- -start_link(WId) -> - gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]). +start_link() -> + gen_server2:start_link(?MODULE, [], [{timeout, infinity}]). next_job_from(Pid, CPid) -> gen_server2:cast(Pid, {next_job_from, CPid}). @@ -71,45 +69,43 @@ run(Fun) -> %%---------------------------------------------------------------------------- -init([WId]) -> +init([]) -> ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), - ok = worker_pool:idle(WId), + ok = worker_pool:ready(self()), put(worker_pool_worker, true), - {ok, #state{id = WId}, hibernate, + {ok, undefined, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7; prioritise_cast(_Msg, _Len, _State) -> 0. -handle_call({submit, Fun, CPid}, From, State = #state{next = undefined}) -> - {noreply, State#state{next = {job, CPid, From, Fun}}, hibernate}; +handle_call({submit, Fun, CPid}, From, undefined) -> + {noreply, {job, CPid, From, Fun}, hibernate}; -handle_call({submit, Fun, CPid}, From, State = #state{next = {from, CPid, MRef}, - id = WId}) -> +handle_call({submit, Fun, CPid}, From, {from, CPid, MRef}) -> erlang:demonitor(MRef), gen_server2:reply(From, run(Fun)), - ok = worker_pool:idle(WId), - {noreply, State#state{next = undefined}, hibernate}; + ok = worker_pool:idle(self()), + {noreply, undefined, hibernate}; handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. -handle_cast({next_job_from, CPid}, State = #state{next = undefined}) -> +handle_cast({next_job_from, CPid}, undefined) -> MRef = erlang:monitor(process, CPid), - {noreply, State#state{next = {from, CPid, MRef}}, hibernate}; + {noreply, {from, CPid, MRef}, hibernate}; -handle_cast({next_job_from, CPid}, State = #state{next = {job, CPid, From, Fun}, - id = WId}) -> +handle_cast({next_job_from, CPid}, {job, CPid, From, Fun}) -> gen_server2:reply(From, run(Fun)), - ok = worker_pool:idle(WId), - {noreply, State#state{next = undefined}, hibernate}; + ok = worker_pool:idle(self()), + {noreply, undefined, hibernate}; -handle_cast({submit_async, Fun}, State = #state{id = WId}) -> +handle_cast({submit_async, Fun}, undefined) -> run(Fun), - ok = worker_pool:idle(WId), - {noreply, State, hibernate}; + ok = worker_pool:idle(self()), + {noreply, undefined, hibernate}; handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -118,14 +114,12 @@ handle_cast({set_maximum_since_use, Age}, State) -> handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. -handle_info({'DOWN', MRef, process, CPid, _Reason}, - State = #state{id = WId, - next = {from, CPid, MRef}}) -> - ok = worker_pool:idle(WId), - {noreply, State#state{next = undefined}}; +handle_info({'DOWN', MRef, process, CPid, _Reason}, {from, CPid, MRef}) -> + ok = worker_pool:idle(self()), + {noreply, undefined, hibernate}; handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) -> - {noreply, State}; + {noreply, State, hibernate}; handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. |