diff options
Diffstat (limited to 'src/rabbit_exchange.erl')
-rw-r--r-- | src/rabbit_exchange.erl | 139 |
1 files changed, 78 insertions, 61 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 40bee25f..06fd819c 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -93,9 +93,9 @@ recover() -> Exs = rabbit_misc:table_fold( - fun (Exchange, Acc) -> - ok = mnesia:write(rabbit_exchange, Exchange, write), - [Exchange | Acc] + fun (X, Acc) -> + ok = mnesia:write(rabbit_exchange, X, write), + [X | Acc] end, [], rabbit_durable_exchange), Bs = rabbit_binding:recover(), recover_with_bindings( @@ -112,30 +112,30 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> recover_with_bindings([], [], []) -> ok. -declare(ExchangeName, Type, Durable, AutoDelete, Args) -> - Exchange = #exchange{name = ExchangeName, - type = Type, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args}, +declare(XName, Type, Durable, AutoDelete, Args) -> + X = #exchange{name = XName, + type = Type, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args}, %% We want to upset things if it isn't ok; this is different from %% the other hooks invocations, where we tend to ignore the return %% value. TypeModule = type_to_module(Type), - ok = TypeModule:validate(Exchange), + ok = TypeModule:validate(X), case rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({rabbit_exchange, ExchangeName}) of + case mnesia:wread({rabbit_exchange, XName}) of [] -> - ok = mnesia:write(rabbit_exchange, Exchange, write), + ok = mnesia:write(rabbit_exchange, X, write), ok = case Durable of true -> mnesia:write(rabbit_durable_exchange, - Exchange, write); + X, write); false -> ok end, - {new, Exchange}; + {new, X}; [ExistingX] -> {existing, ExistingX} end @@ -225,52 +225,69 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -publish(X, Delivery) -> - publish(X, [], Delivery). - -publish(X = #exchange{type = Type}, Seen, Delivery) -> - case (type_to_module(Type)):publish(X, Delivery) of - {_, []} = R -> - #exchange{name = XName, arguments = Args} = X, - case rabbit_misc:r_arg(XName, exchange, Args, - <<"alternate-exchange">>) of - undefined -> - R; - AName -> - NewSeen = [XName | Seen], - case lists:member(AName, NewSeen) of - true -> R; - false -> case lookup(AName) of - {ok, AX} -> - publish(AX, NewSeen, Delivery); - {error, not_found} -> - rabbit_log:warning( - "alternate exchange for ~s " - "does not exist: ~s", - [rabbit_misc:rs(XName), - rabbit_misc:rs(AName)]), - R - end - end - end; - R -> - R +publish(X = #exchange{name = XName}, Delivery) -> + QueueNames = find_queues(Delivery, queue:from_list([X]), [XName], []), + QueuePids = lookup_qpids(QueueNames), + rabbit_router:deliver(QueuePids, Delivery). + +find_queues(Delivery, WorkList, SeenExchanges, QueueNames) -> + case queue:out(WorkList) of + {empty, _WorkList} -> + lists:usort(lists:flatten(QueueNames)); + {{value, X = #exchange{type = Type}}, WorkList1} -> + {NewQueueNames, NewExchangeNames} = + process_alternate( + X, ((type_to_module(Type)):publish(X, Delivery))), + {WorkList2, SeenExchanges1} = + lists:foldl( + fun (XName, {WorkListN, SeenExchangesN} = Acc) -> + case lists:member(XName, SeenExchangesN) of + true -> Acc; + false -> {case lookup(XName) of + {ok, X1} -> + queue:in(X1, WorkListN); + {error, not_found} -> + WorkListN + end, [XName | SeenExchangesN]} + end + end, {WorkList1, SeenExchanges}, NewExchangeNames), + find_queues(Delivery, WorkList2, SeenExchanges1, + [NewQueueNames | QueueNames]) end. -call_with_exchange(Exchange, Fun) -> +process_alternate(#exchange{name = XName, arguments = Args}, {[], []}) -> + case rabbit_misc:r_arg(XName, exchange, Args, <<"alternate-exchange">>) of + undefined -> + {[], []}; + AName -> + {[], [AName]} + end; +process_alternate(_X, Results) -> + Results. + +lookup_qpids(QueueNames) -> + lists:foldl( + fun (Key, Acc) -> + case mnesia:dirty_read({rabbit_queue, Key}) of + [#amqqueue{pid = QPid}] -> [QPid | Acc]; + [] -> Acc + end + end, [], QueueNames). + +call_with_exchange(XName, Fun) -> rabbit_misc:execute_mnesia_transaction( - fun () -> case mnesia:read({rabbit_exchange, Exchange}) of + fun () -> case mnesia:read({rabbit_exchange, XName}) of [] -> {error, not_found}; [X] -> Fun(X) end end). -delete(ExchangeName, IfUnused) -> +delete(XName, IfUnused) -> Fun = case IfUnused of true -> fun conditional_delete/1; false -> fun unconditional_delete/1 end, - case call_with_exchange(ExchangeName, Fun) of + case call_with_exchange(XName, Fun) of {deleted, X = #exchange{type = Type}, Bs} -> (type_to_module(Type)):delete(X, Bs), ok; @@ -280,21 +297,21 @@ delete(ExchangeName, IfUnused) -> maybe_auto_delete(#exchange{auto_delete = false}) -> not_deleted; -maybe_auto_delete(#exchange{auto_delete = true} = Exchange) -> - case conditional_delete(Exchange) of - {error, in_use} -> not_deleted; - {deleted, Exchange, []} -> auto_deleted +maybe_auto_delete(#exchange{auto_delete = true} = X) -> + case conditional_delete(X) of + {error, in_use} -> not_deleted; + {deleted, X, []} -> auto_deleted end. -conditional_delete(Exchange = #exchange{name = ExchangeName}) -> - case rabbit_binding:has_for_exchange(ExchangeName) of - false -> unconditional_delete(Exchange); +conditional_delete(X = #exchange{name = XName}) -> + case rabbit_binding:has_for_exchange(XName) of + false -> unconditional_delete(X); true -> {error, in_use} end. -unconditional_delete(Exchange = #exchange{name = ExchangeName}) -> - Bindings = rabbit_binding:remove_for_exchange(ExchangeName), - ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), - ok = mnesia:delete({rabbit_exchange, ExchangeName}), - rabbit_event:notify(exchange_deleted, [{name, ExchangeName}]), - {deleted, Exchange, Bindings}. +unconditional_delete(X = #exchange{name = XName}) -> + Bindings = rabbit_binding:remove_for_exchange(XName), + ok = mnesia:delete({rabbit_durable_exchange, XName}), + ok = mnesia:delete({rabbit_exchange, XName}), + rabbit_event:notify(exchange_deleted, [{name, XName}]), + {deleted, X, Bindings}. |