diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-05-20 17:52:48 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-05-20 17:52:48 +0100 |
commit | 84713a3a28622e097540be7b8ac0c3a055cf5500 (patch) | |
tree | 3acb17c370d7c7964ce98dc6c37b913f2aeab34b | |
parent | 5efb8249ddb1bead70a20f2f0ce3043b1529dee2 (diff) | |
download | rabbitmq-server-84713a3a28622e097540be7b8ac0c3a055cf5500.tar.gz |
carry 'mandatory' and 'immediate' flags through to UMEs
-rw-r--r-- | src/rabbit_exchange.erl | 46 |
1 files changed, 22 insertions, 24 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index eb45e4f3..4b5aea5c 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -190,34 +190,29 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). -sort_arguments(Arguments) -> - lists:keysort(1, Arguments). - publish(X, Mandatory, Immediate, Txn, Message = #basic_message{routing_key = RK, content = C}) -> case rabbit_router:deliver(route(X, RK, C), Mandatory, Immediate, Txn, Message) of - {RoutingRes, []} -> DeliveredQPids = handle_unrouted(X, Txn, Message), - {RoutingRes, DeliveredQPids}; - Other -> Other - end. - -handle_unrouted(#exchange{name = XName, arguments = Args}, Txn, Message) -> - case rabbit_misc:r_arg(XName, exchange, Args, <<"ume">>) of - undefined -> - []; - UmeName -> - case lookup(UmeName) of - {ok, Ume} -> - {routed, DeliveredQPids} = - publish(Ume, false, false, Txn, Message), - DeliveredQPids; - {error, not_found} -> - rabbit_log:warning( - "unroutable message exchange for ~s does not exist: ~s", - [rabbit_misc:rs(XName), rabbit_misc:rs(UmeName)]), - [] - end + {_, []} = R -> + #exchange{name = XName, arguments = Args} = X, + case rabbit_misc:r_arg(XName, exchange, Args, <<"ume">>) of + undefined -> + R; + UmeName -> + case lookup(UmeName) of + {ok, Ume} -> + publish(Ume, Mandatory, Immediate, Txn, Message); + {error, not_found} -> + rabbit_log:warning( + "unroutable message exchange for ~s " + "does not exist: ~s", + [rabbit_misc:rs(XName), rabbit_misc:rs(UmeName)]), + R + end + end; + R -> + R end. %% return the list of qpids to which a message with a given routing @@ -246,6 +241,9 @@ route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> route(X = #exchange{type = direct}, RoutingKey, _Content) -> match_routing_key(X, RoutingKey). +sort_arguments(Arguments) -> + lists:keysort(1, Arguments). + %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same exchange match_bindings(#exchange{name = Name}, Match) -> |