summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-05-20 17:52:48 +0100
committerMatthias Radestock <matthias@lshift.net>2009-05-20 17:52:48 +0100
commit84713a3a28622e097540be7b8ac0c3a055cf5500 (patch)
tree3acb17c370d7c7964ce98dc6c37b913f2aeab34b
parent5efb8249ddb1bead70a20f2f0ce3043b1529dee2 (diff)
downloadrabbitmq-server-84713a3a28622e097540be7b8ac0c3a055cf5500.tar.gz
carry 'mandatory' and 'immediate' flags through to UMEs
-rw-r--r--src/rabbit_exchange.erl46
1 files changed, 22 insertions, 24 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index eb45e4f3..4b5aea5c 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -190,34 +190,29 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
-sort_arguments(Arguments) ->
- lists:keysort(1, Arguments).
-
publish(X, Mandatory, Immediate, Txn,
Message = #basic_message{routing_key = RK, content = C}) ->
case rabbit_router:deliver(route(X, RK, C),
Mandatory, Immediate, Txn, Message) of
- {RoutingRes, []} -> DeliveredQPids = handle_unrouted(X, Txn, Message),
- {RoutingRes, DeliveredQPids};
- Other -> Other
- end.
-
-handle_unrouted(#exchange{name = XName, arguments = Args}, Txn, Message) ->
- case rabbit_misc:r_arg(XName, exchange, Args, <<"ume">>) of
- undefined ->
- [];
- UmeName ->
- case lookup(UmeName) of
- {ok, Ume} ->
- {routed, DeliveredQPids} =
- publish(Ume, false, false, Txn, Message),
- DeliveredQPids;
- {error, not_found} ->
- rabbit_log:warning(
- "unroutable message exchange for ~s does not exist: ~s",
- [rabbit_misc:rs(XName), rabbit_misc:rs(UmeName)]),
- []
- end
+ {_, []} = R ->
+ #exchange{name = XName, arguments = Args} = X,
+ case rabbit_misc:r_arg(XName, exchange, Args, <<"ume">>) of
+ undefined ->
+ R;
+ UmeName ->
+ case lookup(UmeName) of
+ {ok, Ume} ->
+ publish(Ume, Mandatory, Immediate, Txn, Message);
+ {error, not_found} ->
+ rabbit_log:warning(
+ "unroutable message exchange for ~s "
+ "does not exist: ~s",
+ [rabbit_misc:rs(XName), rabbit_misc:rs(UmeName)]),
+ R
+ end
+ end;
+ R ->
+ R
end.
%% return the list of qpids to which a message with a given routing
@@ -246,6 +241,9 @@ route(X = #exchange{type = fanout}, _RoutingKey, _Content) ->
route(X = #exchange{type = direct}, RoutingKey, _Content) ->
match_routing_key(X, RoutingKey).
+sort_arguments(Arguments) ->
+ lists:keysort(1, Arguments).
+
%% TODO: Maybe this should be handled by a cursor instead.
%% TODO: This causes a full scan for each entry with the same exchange
match_bindings(#exchange{name = Name}, Match) ->