summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-11-08 14:22:03 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-11-08 14:22:03 +0000
commit40ce346c130a2baf3ccfb564dd0ca24c6423f316 (patch)
treeee7da8de620a7818e084145a9253196e593196e5
parent4bdce4d49de9fe81ba27cdda158214d198ce53ae (diff)
downloadrabbitmq-server-40ce346c130a2baf3ccfb564dd0ca24c6423f316.tar.gz
First hacky attempt at fixing deadlocks involing slave_sup. Also includes a hacky attempt at bug 25870 (dying worker pool clients can starve the worker pool of processes) since that's currently necessary.
-rw-r--r--src/rabbit_mirror_queue_misc.erl24
-rw-r--r--src/rabbit_mirror_queue_slave.erl38
-rw-r--r--src/worker_pool.erl29
3 files changed, 51 insertions, 40 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index e6559841..47a44278 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -19,7 +19,8 @@
-export([remove_from_queue/3, on_node_up/0, add_mirrors/2, add_mirror/2,
report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1,
- is_mirrored/1, update_mirrors/2, validate_policy/1]).
+ is_mirrored/1, update_mirrors/2, validate_policy/1,
+ maybe_auto_sync/1]).
%% for testing only
-export([module/1]).
@@ -183,26 +184,15 @@ add_mirror(QName, MirrorNode) ->
end
end).
-start_child(Name, MirrorNode, Q) ->
+start_child(_Name, MirrorNode, Q) ->
+ %% TODO re-add some log stuff here.
case rabbit_misc:with_exit_handler(
- rabbit_misc:const({ok, down}),
+ rabbit_misc:const(down),
fun () ->
rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q])
end) of
- {ok, SPid} when is_pid(SPid) ->
- maybe_auto_sync(Q),
- rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, SPid]),
- {ok, started};
- {error, {{stale_master_pid, StalePid}, _}} ->
- rabbit_log:warning("Detected stale HA master while adding "
- "mirror of ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, StalePid]),
- {ok, stale_master};
- {error, {{duplicate_live_master, _}=Err, _}} ->
- Err;
- Other ->
- Other
+ {ok, SPid} -> rabbit_mirror_queue_slave:go(SPid);
+ _ -> ok
end.
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 6f78d1d2..694dcd9a 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -24,7 +24,7 @@
%% All instructions from the GM group must be processed in the order
%% in which they're received.
--export([start_link/1, set_maximum_since_use/2, info/1]).
+-export([start_link/1, set_maximum_since_use/2, info/1, go/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_call/4,
@@ -78,7 +78,14 @@ set_maximum_since_use(QPid, Age) ->
info(QPid) -> gen_server2:call(QPid, info, infinity).
-init(Q = #amqqueue { name = QName }) ->
+init(Q) ->
+ {ok, {not_started, Q}, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
+ ?DESIRED_HIBERNATE}}.
+
+go(SPid) -> gen_server2:cast(SPid, go).
+
+handle_go({not_started, Q = #amqqueue { name = QName }} = NotStarted) ->
%% We join the GM group before we add ourselves to the amqqueue
%% record. As a result:
%% 1. We can receive msgs from GM that correspond to messages we will
@@ -124,22 +131,24 @@ init(Q = #amqqueue { name = QName }) ->
},
ok = gm:broadcast(GM, request_depth),
ok = gm:validate_members(GM, [GM | [G || {G, _} <- GMPids]]),
- {ok, State, hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
- ?DESIRED_HIBERNATE}};
+ rabbit_mirror_queue_misc:maybe_auto_sync(Q1),
+ {noreply, State};
{stale, StalePid} ->
- {stop, {stale_master_pid, StalePid}};
+ gm:leave(GM),
+ {stop, {stale_master_pid, StalePid}, NotStarted};
duplicate_live_master ->
- {stop, {duplicate_live_master, Node}};
+ gm:leave(GM),
+ {stop, {duplicate_live_master, Node}, NotStarted};
existing ->
gm:leave(GM),
- ignore;
+ {stop, normal, NotStarted};
master_in_recovery ->
+ gm:leave(GM),
%% The queue record vanished - we must have a master starting
%% concurrently with us. In that case we can safely decide to do
%% nothing here, and the master will start us in
%% master:init_with_existing_bq/3
- ignore
+ {stop, normal, NotStarted}
end.
init_it(Self, GM, Node, QName) ->
@@ -208,6 +217,9 @@ handle_call({gm_deaths, LiveGMPids}, From,
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State).
+handle_cast(go, {not_started, _Q} = NotStarted) ->
+ handle_go(NotStarted);
+
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
@@ -293,6 +305,10 @@ handle_info({bump_credit, Msg}, State) ->
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
+terminate(_, {not_started, _Q}) ->
+ %% TODO we accept gm:leave/1 related garblings here, that '_'
+ %% should be a 'normal'
+ ok;
terminate(_Reason, #state { backing_queue_state = undefined }) ->
%% We've received a delete_and_terminate from gm, thus nothing to
%% do here.
@@ -403,7 +419,9 @@ handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
inform_deaths(SPid, Live) ->
- case gen_server2:call(SPid, {gm_deaths, Live}, infinity) of
+ case rabbit_misc:with_exit_handler(
+ rabbit_misc:const(ok),
+ fun() -> gen_server2:call(SPid, {gm_deaths, Live}, infinity) end) of
ok -> ok;
{promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]}
end.
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 488db5ec..52b22075 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -63,7 +63,7 @@ start_link() ->
submit(Fun) ->
case get(worker_pool_worker) of
true -> worker_pool_worker:run(Fun);
- _ -> Pid = gen_server2:call(?SERVER, next_free, infinity),
+ _ -> Pid = gen_server2:call(?SERVER, {next_free, self()}, infinity),
worker_pool_worker:submit(Pid, Fun)
end.
@@ -79,12 +79,12 @@ init([]) ->
{ok, #state { pending = queue:new(), available = queue:new() }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call(next_free, From, State = #state { available = Avail,
+handle_call({next_free, Pid}, From, State = #state { available = Avail,
pending = Pending }) ->
case queue:out(Avail) of
{empty, _Avail} ->
{noreply,
- State #state { pending = queue:in({next_free, From}, Pending) },
+ State #state { pending = queue:in({next_free, From, Pid}, Pending) },
hibernate};
{{value, WId}, Avail1} ->
{reply, get_worker_pid(WId), State #state { available = Avail1 },
@@ -96,16 +96,19 @@ handle_call(Msg, _From, State) ->
handle_cast({idle, WId}, State = #state { available = Avail,
pending = Pending }) ->
- {noreply, case queue:out(Pending) of
- {empty, _Pending} ->
- State #state { available = queue:in(WId, Avail) };
- {{value, {next_free, From}}, Pending1} ->
- gen_server2:reply(From, get_worker_pid(WId)),
- State #state { pending = Pending1 };
- {{value, {run_async, Fun}}, Pending1} ->
- worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
- State #state { pending = Pending1 }
- end, hibernate};
+ case queue:out(Pending) of
+ {empty, _Pending} ->
+ {noreply, State #state { available = queue:in(WId, Avail) }, hibernate};
+ {{value, {next_free, From, Pid}}, Pending1} ->
+ case is_process_alive(Pid) of
+ true -> gen_server2:reply(From, get_worker_pid(WId)),
+ {noreply, State #state { pending = Pending1 }, hibernate};
+ false -> handle_cast({idle, WId}, State#state{pending = Pending1})
+ end;
+ {{value, {run_async, Fun}}, Pending1} ->
+ worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
+ {noreply, State #state { pending = Pending1 }, hibernate}
+ end;
handle_cast({run_async, Fun}, State = #state { available = Avail,
pending = Pending }) ->