diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-11-08 14:22:03 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-11-08 14:22:03 +0000 |
commit | 40ce346c130a2baf3ccfb564dd0ca24c6423f316 (patch) | |
tree | ee7da8de620a7818e084145a9253196e593196e5 | |
parent | 4bdce4d49de9fe81ba27cdda158214d198ce53ae (diff) | |
download | rabbitmq-server-40ce346c130a2baf3ccfb564dd0ca24c6423f316.tar.gz |
First hacky attempt at fixing deadlocks involing slave_sup. Also includes a hacky attempt at bug 25870 (dying worker pool clients can starve the worker pool of processes) since that's currently necessary.
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 24 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 38 | ||||
-rw-r--r-- | src/worker_pool.erl | 29 |
3 files changed, 51 insertions, 40 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index e6559841..47a44278 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -19,7 +19,8 @@ -export([remove_from_queue/3, on_node_up/0, add_mirrors/2, add_mirror/2, report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1, - is_mirrored/1, update_mirrors/2, validate_policy/1]). + is_mirrored/1, update_mirrors/2, validate_policy/1, + maybe_auto_sync/1]). %% for testing only -export([module/1]). @@ -183,26 +184,15 @@ add_mirror(QName, MirrorNode) -> end end). -start_child(Name, MirrorNode, Q) -> +start_child(_Name, MirrorNode, Q) -> + %% TODO re-add some log stuff here. case rabbit_misc:with_exit_handler( - rabbit_misc:const({ok, down}), + rabbit_misc:const(down), fun () -> rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) end) of - {ok, SPid} when is_pid(SPid) -> - maybe_auto_sync(Q), - rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, SPid]), - {ok, started}; - {error, {{stale_master_pid, StalePid}, _}} -> - rabbit_log:warning("Detected stale HA master while adding " - "mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, StalePid]), - {ok, stale_master}; - {error, {{duplicate_live_master, _}=Err, _}} -> - Err; - Other -> - Other + {ok, SPid} -> rabbit_mirror_queue_slave:go(SPid); + _ -> ok end. report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 6f78d1d2..694dcd9a 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -24,7 +24,7 @@ %% All instructions from the GM group must be processed in the order %% in which they're received. --export([start_link/1, set_maximum_since_use/2, info/1]). +-export([start_link/1, set_maximum_since_use/2, info/1, go/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1, prioritise_call/4, @@ -78,7 +78,14 @@ set_maximum_since_use(QPid, Age) -> info(QPid) -> gen_server2:call(QPid, info, infinity). -init(Q = #amqqueue { name = QName }) -> +init(Q) -> + {ok, {not_started, Q}, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, + ?DESIRED_HIBERNATE}}. + +go(SPid) -> gen_server2:cast(SPid, go). + +handle_go({not_started, Q = #amqqueue { name = QName }} = NotStarted) -> %% We join the GM group before we add ourselves to the amqqueue %% record. As a result: %% 1. We can receive msgs from GM that correspond to messages we will @@ -124,22 +131,24 @@ init(Q = #amqqueue { name = QName }) -> }, ok = gm:broadcast(GM, request_depth), ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]), - {ok, State, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, - ?DESIRED_HIBERNATE}}; + rabbit_mirror_queue_misc:maybe_auto_sync(Q1), + {noreply, State}; {stale, StalePid} -> - {stop, {stale_master_pid, StalePid}}; + gm:leave(GM), + {stop, {stale_master_pid, StalePid}, NotStarted}; duplicate_live_master -> - {stop, {duplicate_live_master, Node}}; + gm:leave(GM), + {stop, {duplicate_live_master, Node}, NotStarted}; existing -> gm:leave(GM), - ignore; + {stop, normal, NotStarted}; master_in_recovery -> + gm:leave(GM), %% The queue record vanished - we must have a master starting %% concurrently with us. In that case we can safely decide to do %% nothing here, and the master will start us in %% master:init_with_existing_bq/3 - ignore + {stop, normal, NotStarted} end. init_it(Self, GM, Node, QName) -> @@ -208,6 +217,9 @@ handle_call({gm_deaths, LiveGMPids}, From, handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State). +handle_cast(go, {not_started, _Q} = NotStarted) -> + handle_go(NotStarted); + handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -293,6 +305,10 @@ handle_info({bump_credit, Msg}, State) -> handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. +terminate(_, {not_started, _Q}) -> + %% TODO we accept gm:leave/1 related garblings here, that '_' + %% should be a 'normal' + ok; terminate(_Reason, #state { backing_queue_state = undefined }) -> %% We've received a delete_and_terminate from gm, thus nothing to %% do here. @@ -403,7 +419,9 @@ handle_msg([SPid], _From, Msg) -> ok = gen_server2:cast(SPid, {gm, Msg}). inform_deaths(SPid, Live) -> - case gen_server2:call(SPid, {gm_deaths, Live}, infinity) of + case rabbit_misc:with_exit_handler( + rabbit_misc:const(ok), + fun() -> gen_server2:call(SPid, {gm_deaths, Live}, infinity) end) of ok -> ok; {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]} end. diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 488db5ec..52b22075 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -63,7 +63,7 @@ start_link() -> submit(Fun) -> case get(worker_pool_worker) of true -> worker_pool_worker:run(Fun); - _ -> Pid = gen_server2:call(?SERVER, next_free, infinity), + _ -> Pid = gen_server2:call(?SERVER, {next_free, self()}, infinity), worker_pool_worker:submit(Pid, Fun) end. @@ -79,12 +79,12 @@ init([]) -> {ok, #state { pending = queue:new(), available = queue:new() }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call(next_free, From, State = #state { available = Avail, +handle_call({next_free, Pid}, From, State = #state { available = Avail, pending = Pending }) -> case queue:out(Avail) of {empty, _Avail} -> {noreply, - State #state { pending = queue:in({next_free, From}, Pending) }, + State #state { pending = queue:in({next_free, From, Pid}, Pending) }, hibernate}; {{value, WId}, Avail1} -> {reply, get_worker_pid(WId), State #state { available = Avail1 }, @@ -96,16 +96,19 @@ handle_call(Msg, _From, State) -> handle_cast({idle, WId}, State = #state { available = Avail, pending = Pending }) -> - {noreply, case queue:out(Pending) of - {empty, _Pending} -> - State #state { available = queue:in(WId, Avail) }; - {{value, {next_free, From}}, Pending1} -> - gen_server2:reply(From, get_worker_pid(WId)), - State #state { pending = Pending1 }; - {{value, {run_async, Fun}}, Pending1} -> - worker_pool_worker:submit_async(get_worker_pid(WId), Fun), - State #state { pending = Pending1 } - end, hibernate}; + case queue:out(Pending) of + {empty, _Pending} -> + {noreply, State #state { available = queue:in(WId, Avail) }, hibernate}; + {{value, {next_free, From, Pid}}, Pending1} -> + case is_process_alive(Pid) of + true -> gen_server2:reply(From, get_worker_pid(WId)), + {noreply, State #state { pending = Pending1 }, hibernate}; + false -> handle_cast({idle, WId}, State#state{pending = Pending1}) + end; + {{value, {run_async, Fun}}, Pending1} -> + worker_pool_worker:submit_async(get_worker_pid(WId), Fun), + {noreply, State #state { pending = Pending1 }, hibernate} + end; handle_cast({run_async, Fun}, State = #state { available = Avail, pending = Pending }) -> |