diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-09-05 12:25:38 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-09-05 12:25:38 +0100 |
commit | d374681d9e83c0758ad071e075e2795695f3e325 (patch) | |
tree | 021fc6d5058f651d79c2dd8d481f0d51d431ed7f /deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl | |
parent | e4addbbc5e60c775e357f869b9513c7b6270793a (diff) | |
parent | 3ff1fe7f6d6233f512c1f8d78dbe88a99c928146 (diff) | |
download | rabbitmq-server-git-d374681d9e83c0758ad071e075e2795695f3e325.tar.gz |
Merge in default
Diffstat (limited to 'deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl')
-rw-r--r-- | deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl | 60 |
1 files changed, 35 insertions, 25 deletions
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl index 082f2859d7..2bf158ede1 100644 --- a/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl +++ b/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl @@ -21,25 +21,25 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -include("rabbit_federation.hrl"). -%% Supervises the upstream links for an exchange. +%% Supervises the upstream links for an exchange or queue. -export([start_link/1, adjust/3]). -export([init/1]). -start_link(X) -> - supervisor2:start_link(?MODULE, X). +start_link(XorQ) -> + supervisor2:start_link(?MODULE, XorQ). -adjust(Sup, X, everything) -> - [stop(Sup, Upstream, X) || +adjust(Sup, XorQ, everything) -> + [stop(Sup, Upstream, XorQ) || {Upstream, _, _, _} <- supervisor2:which_children(Sup)], - [{ok, _Pid} = supervisor2:start_child(Sup, Spec) || Spec <- specs(X)]; + [{ok, _Pid} = supervisor2:start_child(Sup, Spec) || Spec <- specs(XorQ)]; -adjust(Sup, X, {upstream, UpstreamName}) -> +adjust(Sup, XorQ, {upstream, UpstreamName}) -> OldUpstreams0 = children(Sup, UpstreamName), - NewUpstreams0 = rabbit_federation_upstream:for(X, UpstreamName), + NewUpstreams0 = rabbit_federation_upstream:for(XorQ, UpstreamName), %% If any haven't changed, don't restart them. The broker will %% avoid telling us about connections that have not changed - %% syntactically, but even if one has, this X may not have that + %% syntactically, but even if one has, this XorQ may not have that %% connection in an upstream, so we still need to check here. {OldUpstreams, NewUpstreams} = lists:foldl( @@ -49,13 +49,13 @@ adjust(Sup, X, {upstream, UpstreamName}) -> false -> {OldUs, NewUs} end end, {OldUpstreams0, NewUpstreams0}, OldUpstreams0), - [stop(Sup, OldUpstream, X) || OldUpstream <- OldUpstreams], - [start(Sup, NewUpstream, X) || NewUpstream <- NewUpstreams]; + [stop(Sup, OldUpstream, XorQ) || OldUpstream <- OldUpstreams], + [start(Sup, NewUpstream, XorQ) || NewUpstream <- NewUpstreams]; -adjust(Sup, X = #exchange{name = XName}, {clear_upstream, UpstreamName}) -> +adjust(Sup, XorQ, {clear_upstream, UpstreamName}) -> ok = rabbit_federation_db:prune_scratch( - XName, rabbit_federation_upstream:for(X)), - [stop(Sup, Upstream, X) || Upstream <- children(Sup, UpstreamName)]; + name(XorQ), rabbit_federation_upstream:for(XorQ)), + [stop(Sup, Upstream, XorQ) || Upstream <- children(Sup, UpstreamName)]; %% TODO handle changes of upstream sets minimally (bug 24853) adjust(Sup, X = #exchange{name = XName}, {upstream_set, Set}) -> @@ -65,14 +65,16 @@ adjust(Sup, X = #exchange{name = XName}, {upstream_set, Set}) -> XName, rabbit_federation_upstream:for(X)) end, adjust(Sup, X, everything); -adjust(Sup, X, {clear_upstream_set, _}) -> - adjust(Sup, X, everything). +adjust(Sup, Q = #amqqueue{}, {upstream_set, _}) -> + adjust(Sup, Q, everything); +adjust(Sup, XorQ, {clear_upstream_set, _}) -> + adjust(Sup, XorQ, everything). -start(Sup, Upstream, X) -> - {ok, _Pid} = supervisor2:start_child(Sup, spec(Upstream, X)), +start(Sup, Upstream, XorQ) -> + {ok, _Pid} = supervisor2:start_child(Sup, spec(Upstream, XorQ)), ok. -stop(Sup, Upstream, #exchange{name = XName}) -> +stop(Sup, Upstream, XorQ) -> ok = supervisor2:terminate_child(Sup, Upstream), ok = supervisor2:delete_child(Sup, Upstream), %% While the link will report its own removal, that only works if @@ -80,7 +82,7 @@ stop(Sup, Upstream, #exchange{name = XName}) -> %% come up, the possibility exists that there *is* no link %% process, but we still have a report in the status table. So %% remove it here too. - rabbit_federation_status:remove(Upstream, XName). + rabbit_federation_status:remove(Upstream, name(XorQ)). children(Sup, UpstreamName) -> rabbit_federation_util:find_upstreams( @@ -88,15 +90,23 @@ children(Sup, UpstreamName) -> %%---------------------------------------------------------------------------- -init(X) -> +init(XorQ) -> %% 1, 1 so that the supervisor can give up and get into waiting %% for the reconnect_delay quickly. - {ok, {{one_for_one, 1, 1}, specs(X)}}. + {ok, {{one_for_one, 1, 1}, specs(XorQ)}}. -specs(X) -> - [spec(Upstream, X) || Upstream <- rabbit_federation_upstream:for(X)]. +specs(XorQ) -> + [spec(Upstream, XorQ) || Upstream <- rabbit_federation_upstream:for(XorQ)]. spec(Upstream = #upstream{reconnect_delay = Delay}, #exchange{name = XName}) -> {Upstream, {rabbit_federation_link, start_link, [{Upstream, XName}]}, {permanent, Delay}, ?MAX_WAIT, worker, - [rabbit_federation_link]}. + [rabbit_federation_link]}; + +spec(Upstream = #upstream{reconnect_delay = Delay}, Q = #amqqueue{}) -> + {Upstream, {rabbit_federation_queue_link, start_link, [{Upstream, Q}]}, + {permanent, Delay}, ?MAX_WAIT, worker, + [rabbit_federation_queue_link]}. + +name(#exchange{name = XName}) -> XName; +name(#amqqueue{name = QName}) -> QName. |