summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-09-05 12:25:38 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-09-05 12:25:38 +0100
commitd374681d9e83c0758ad071e075e2795695f3e325 (patch)
tree021fc6d5058f651d79c2dd8d481f0d51d431ed7f /deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl
parente4addbbc5e60c775e357f869b9513c7b6270793a (diff)
parent3ff1fe7f6d6233f512c1f8d78dbe88a99c928146 (diff)
downloadrabbitmq-server-git-d374681d9e83c0758ad071e075e2795695f3e325.tar.gz
Merge in default
Diffstat (limited to 'deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl')
-rw-r--r--deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl60
1 files changed, 35 insertions, 25 deletions
diff --git a/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl b/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl
index 082f2859d7..2bf158ede1 100644
--- a/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl
+++ b/deps/rabbitmq_federation/src/rabbit_federation_link_sup.erl
@@ -21,25 +21,25 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-include("rabbit_federation.hrl").
-%% Supervises the upstream links for an exchange.
+%% Supervises the upstream links for an exchange or queue.
-export([start_link/1, adjust/3]).
-export([init/1]).
-start_link(X) ->
- supervisor2:start_link(?MODULE, X).
+start_link(XorQ) ->
+ supervisor2:start_link(?MODULE, XorQ).
-adjust(Sup, X, everything) ->
- [stop(Sup, Upstream, X) ||
+adjust(Sup, XorQ, everything) ->
+ [stop(Sup, Upstream, XorQ) ||
{Upstream, _, _, _} <- supervisor2:which_children(Sup)],
- [{ok, _Pid} = supervisor2:start_child(Sup, Spec) || Spec <- specs(X)];
+ [{ok, _Pid} = supervisor2:start_child(Sup, Spec) || Spec <- specs(XorQ)];
-adjust(Sup, X, {upstream, UpstreamName}) ->
+adjust(Sup, XorQ, {upstream, UpstreamName}) ->
OldUpstreams0 = children(Sup, UpstreamName),
- NewUpstreams0 = rabbit_federation_upstream:for(X, UpstreamName),
+ NewUpstreams0 = rabbit_federation_upstream:for(XorQ, UpstreamName),
%% If any haven't changed, don't restart them. The broker will
%% avoid telling us about connections that have not changed
- %% syntactically, but even if one has, this X may not have that
+ %% syntactically, but even if one has, this XorQ may not have that
%% connection in an upstream, so we still need to check here.
{OldUpstreams, NewUpstreams} =
lists:foldl(
@@ -49,13 +49,13 @@ adjust(Sup, X, {upstream, UpstreamName}) ->
false -> {OldUs, NewUs}
end
end, {OldUpstreams0, NewUpstreams0}, OldUpstreams0),
- [stop(Sup, OldUpstream, X) || OldUpstream <- OldUpstreams],
- [start(Sup, NewUpstream, X) || NewUpstream <- NewUpstreams];
+ [stop(Sup, OldUpstream, XorQ) || OldUpstream <- OldUpstreams],
+ [start(Sup, NewUpstream, XorQ) || NewUpstream <- NewUpstreams];
-adjust(Sup, X = #exchange{name = XName}, {clear_upstream, UpstreamName}) ->
+adjust(Sup, XorQ, {clear_upstream, UpstreamName}) ->
ok = rabbit_federation_db:prune_scratch(
- XName, rabbit_federation_upstream:for(X)),
- [stop(Sup, Upstream, X) || Upstream <- children(Sup, UpstreamName)];
+ name(XorQ), rabbit_federation_upstream:for(XorQ)),
+ [stop(Sup, Upstream, XorQ) || Upstream <- children(Sup, UpstreamName)];
%% TODO handle changes of upstream sets minimally (bug 24853)
adjust(Sup, X = #exchange{name = XName}, {upstream_set, Set}) ->
@@ -65,14 +65,16 @@ adjust(Sup, X = #exchange{name = XName}, {upstream_set, Set}) ->
XName, rabbit_federation_upstream:for(X))
end,
adjust(Sup, X, everything);
-adjust(Sup, X, {clear_upstream_set, _}) ->
- adjust(Sup, X, everything).
+adjust(Sup, Q = #amqqueue{}, {upstream_set, _}) ->
+ adjust(Sup, Q, everything);
+adjust(Sup, XorQ, {clear_upstream_set, _}) ->
+ adjust(Sup, XorQ, everything).
-start(Sup, Upstream, X) ->
- {ok, _Pid} = supervisor2:start_child(Sup, spec(Upstream, X)),
+start(Sup, Upstream, XorQ) ->
+ {ok, _Pid} = supervisor2:start_child(Sup, spec(Upstream, XorQ)),
ok.
-stop(Sup, Upstream, #exchange{name = XName}) ->
+stop(Sup, Upstream, XorQ) ->
ok = supervisor2:terminate_child(Sup, Upstream),
ok = supervisor2:delete_child(Sup, Upstream),
%% While the link will report its own removal, that only works if
@@ -80,7 +82,7 @@ stop(Sup, Upstream, #exchange{name = XName}) ->
%% come up, the possibility exists that there *is* no link
%% process, but we still have a report in the status table. So
%% remove it here too.
- rabbit_federation_status:remove(Upstream, XName).
+ rabbit_federation_status:remove(Upstream, name(XorQ)).
children(Sup, UpstreamName) ->
rabbit_federation_util:find_upstreams(
@@ -88,15 +90,23 @@ children(Sup, UpstreamName) ->
%%----------------------------------------------------------------------------
-init(X) ->
+init(XorQ) ->
%% 1, 1 so that the supervisor can give up and get into waiting
%% for the reconnect_delay quickly.
- {ok, {{one_for_one, 1, 1}, specs(X)}}.
+ {ok, {{one_for_one, 1, 1}, specs(XorQ)}}.
-specs(X) ->
- [spec(Upstream, X) || Upstream <- rabbit_federation_upstream:for(X)].
+specs(XorQ) ->
+ [spec(Upstream, XorQ) || Upstream <- rabbit_federation_upstream:for(XorQ)].
spec(Upstream = #upstream{reconnect_delay = Delay}, #exchange{name = XName}) ->
{Upstream, {rabbit_federation_link, start_link, [{Upstream, XName}]},
{permanent, Delay}, ?MAX_WAIT, worker,
- [rabbit_federation_link]}.
+ [rabbit_federation_link]};
+
+spec(Upstream = #upstream{reconnect_delay = Delay}, Q = #amqqueue{}) ->
+ {Upstream, {rabbit_federation_queue_link, start_link, [{Upstream, Q}]},
+ {permanent, Delay}, ?MAX_WAIT, worker,
+ [rabbit_federation_queue_link]}.
+
+name(#exchange{name = XName}) -> XName;
+name(#amqqueue{name = QName}) -> QName.