summaryrefslogtreecommitdiff
path: root/src/rabbit_exchange.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_exchange.erl')
-rw-r--r--src/rabbit_exchange.erl139
1 files changed, 78 insertions, 61 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 40bee25f..06fd819c 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -93,9 +93,9 @@
recover() ->
Exs = rabbit_misc:table_fold(
- fun (Exchange, Acc) ->
- ok = mnesia:write(rabbit_exchange, Exchange, write),
- [Exchange | Acc]
+ fun (X, Acc) ->
+ ok = mnesia:write(rabbit_exchange, X, write),
+ [X | Acc]
end, [], rabbit_durable_exchange),
Bs = rabbit_binding:recover(),
recover_with_bindings(
@@ -112,30 +112,30 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) ->
recover_with_bindings([], [], []) ->
ok.
-declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
- Exchange = #exchange{name = ExchangeName,
- type = Type,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args},
+declare(XName, Type, Durable, AutoDelete, Args) ->
+ X = #exchange{name = XName,
+ type = Type,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args},
%% We want to upset things if it isn't ok; this is different from
%% the other hooks invocations, where we tend to ignore the return
%% value.
TypeModule = type_to_module(Type),
- ok = TypeModule:validate(Exchange),
+ ok = TypeModule:validate(X),
case rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({rabbit_exchange, ExchangeName}) of
+ case mnesia:wread({rabbit_exchange, XName}) of
[] ->
- ok = mnesia:write(rabbit_exchange, Exchange, write),
+ ok = mnesia:write(rabbit_exchange, X, write),
ok = case Durable of
true ->
mnesia:write(rabbit_durable_exchange,
- Exchange, write);
+ X, write);
false ->
ok
end,
- {new, Exchange};
+ {new, X};
[ExistingX] ->
{existing, ExistingX}
end
@@ -225,52 +225,69 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
-publish(X, Delivery) ->
- publish(X, [], Delivery).
-
-publish(X = #exchange{type = Type}, Seen, Delivery) ->
- case (type_to_module(Type)):publish(X, Delivery) of
- {_, []} = R ->
- #exchange{name = XName, arguments = Args} = X,
- case rabbit_misc:r_arg(XName, exchange, Args,
- <<"alternate-exchange">>) of
- undefined ->
- R;
- AName ->
- NewSeen = [XName | Seen],
- case lists:member(AName, NewSeen) of
- true -> R;
- false -> case lookup(AName) of
- {ok, AX} ->
- publish(AX, NewSeen, Delivery);
- {error, not_found} ->
- rabbit_log:warning(
- "alternate exchange for ~s "
- "does not exist: ~s",
- [rabbit_misc:rs(XName),
- rabbit_misc:rs(AName)]),
- R
- end
- end
- end;
- R ->
- R
+publish(X = #exchange{name = XName}, Delivery) ->
+ QueueNames = find_queues(Delivery, queue:from_list([X]), [XName], []),
+ QueuePids = lookup_qpids(QueueNames),
+ rabbit_router:deliver(QueuePids, Delivery).
+
+find_queues(Delivery, WorkList, SeenExchanges, QueueNames) ->
+ case queue:out(WorkList) of
+ {empty, _WorkList} ->
+ lists:usort(lists:flatten(QueueNames));
+ {{value, X = #exchange{type = Type}}, WorkList1} ->
+ {NewQueueNames, NewExchangeNames} =
+ process_alternate(
+ X, ((type_to_module(Type)):publish(X, Delivery))),
+ {WorkList2, SeenExchanges1} =
+ lists:foldl(
+ fun (XName, {WorkListN, SeenExchangesN} = Acc) ->
+ case lists:member(XName, SeenExchangesN) of
+ true -> Acc;
+ false -> {case lookup(XName) of
+ {ok, X1} ->
+ queue:in(X1, WorkListN);
+ {error, not_found} ->
+ WorkListN
+ end, [XName | SeenExchangesN]}
+ end
+ end, {WorkList1, SeenExchanges}, NewExchangeNames),
+ find_queues(Delivery, WorkList2, SeenExchanges1,
+ [NewQueueNames | QueueNames])
end.
-call_with_exchange(Exchange, Fun) ->
+process_alternate(#exchange{name = XName, arguments = Args}, {[], []}) ->
+ case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of
+ undefined ->
+ {[], []};
+ AName ->
+ {[], [AName]}
+ end;
+process_alternate(_X, Results) ->
+ Results.
+
+lookup_qpids(QueueNames) ->
+ lists:foldl(
+ fun (Key, Acc) ->
+ case mnesia:dirty_read({rabbit_queue, Key}) of
+ [#amqqueue{pid = QPid}] -> [QPid | Acc];
+ [] -> Acc
+ end
+ end, [], QueueNames).
+
+call_with_exchange(XName, Fun) ->
rabbit_misc:execute_mnesia_transaction(
- fun () -> case mnesia:read({rabbit_exchange, Exchange}) of
+ fun () -> case mnesia:read({rabbit_exchange, XName}) of
[] -> {error, not_found};
[X] -> Fun(X)
end
end).
-delete(ExchangeName, IfUnused) ->
+delete(XName, IfUnused) ->
Fun = case IfUnused of
true -> fun conditional_delete/1;
false -> fun unconditional_delete/1
end,
- case call_with_exchange(ExchangeName, Fun) of
+ case call_with_exchange(XName, Fun) of
{deleted, X = #exchange{type = Type}, Bs} ->
(type_to_module(Type)):delete(X, Bs),
ok;
@@ -280,21 +297,21 @@ delete(ExchangeName, IfUnused) ->
maybe_auto_delete(#exchange{auto_delete = false}) ->
not_deleted;
-maybe_auto_delete(#exchange{auto_delete = true} = Exchange) ->
- case conditional_delete(Exchange) of
- {error, in_use} -> not_deleted;
- {deleted, Exchange, []} -> auto_deleted
+maybe_auto_delete(#exchange{auto_delete = true} = X) ->
+ case conditional_delete(X) of
+ {error, in_use} -> not_deleted;
+ {deleted, X, []} -> auto_deleted
end.
-conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
- case rabbit_binding:has_for_exchange(ExchangeName) of
- false -> unconditional_delete(Exchange);
+conditional_delete(X = #exchange{name = XName}) ->
+ case rabbit_binding:has_for_exchange(XName) of
+ false -> unconditional_delete(X);
true -> {error, in_use}
end.
-unconditional_delete(Exchange = #exchange{name = ExchangeName}) ->
- Bindings = rabbit_binding:remove_for_exchange(ExchangeName),
- ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
- ok = mnesia:delete({rabbit_exchange, ExchangeName}),
- rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]),
- {deleted, Exchange, Bindings}.
+unconditional_delete(X = #exchange{name = XName}) ->
+ Bindings = rabbit_binding:remove_for_exchange(XName),
+ ok = mnesia:delete({rabbit_durable_exchange, XName}),
+ ok = mnesia:delete({rabbit_exchange, XName}),
+ rabbit_event:notify(exchange_deleted, [{name, XName}]),
+ {deleted, X, Bindings}.