summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-04-28 23:08:41 +0100
committerMatthias Radestock <matthias@lshift.net>2009-04-28 23:08:41 +0100
commite226e86c7d342ea53afae6a4db2d9d11efcca157 (patch)
treec86a4bc52cbc7a77a433b3d48cf92175d7661cdc
parent83497bbd0c37ddef8b07a1c8f82311ea9264ce61 (diff)
downloadrabbitmq-server-e226e86c7d342ea53afae6a4db2d9d11efcca157.tar.gz
add "unroutable message exchanges" feature
-rw-r--r--src/rabbit_channel.erl34
1 files changed, 31 insertions, 3 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b2716ec4..a7e3f2bb 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -317,7 +317,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
true -> rabbit_guid:guid();
false -> none
end,
- {noreply, publish(Mandatory, Immediate,
+ {noreply, publish(Exchange, Mandatory, Immediate,
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = DecodedContent,
@@ -767,15 +767,43 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ok -> return_ok(State, NoWait, ReturnMethod)
end.
-publish(Mandatory, Immediate, Message, QPids,
+publish(X, Mandatory, Immediate, Message, QPids,
State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) ->
- Handled = deliver(QPids, Mandatory, Immediate, TxnKey,
+ Handled = deliver(X, QPids, Mandatory, Immediate, TxnKey,
Message, WriterPid),
case TxnKey of
none -> State;
_ -> add_tx_participants(Handled, State)
end.
+deliver(X, QPids, Mandatory, Immediate, Txn, Message, WriterPid) ->
+ case deliver(QPids, Mandatory, Immediate, Txn, Message, WriterPid) of
+ [] ->
+ case lists:keysearch(<<"ume">>, 1, X#exchange.arguments) of
+ {value, {_, longstr, UmeNameBin}} ->
+ XName = X#exchange.name,
+ UmeName = rabbit_misc:r(XName, exchange, UmeNameBin),
+ case rabbit_exchange:lookup(UmeName) of
+ {ok, Ume} ->
+ #basic_message{routing_key = RK, content = C} =
+ Message,
+ deliver(Ume, rabbit_exchange:route(Ume, RK, C),
+ false, false, Txn, Message, WriterPid);
+ {error, not_found} ->
+ rabbit_log:warning(
+ "unroutable message exchange for ~s "
+ "does not exist: ~s",
+ [rabbit_misc:rs(XName),
+ rabbit_misc:rs(UmeName)]),
+ []
+ end;
+ false ->
+ []
+ end;
+ Handled ->
+ Handled
+ end.
+
deliver(QPids, Mandatory, Immediate, Txn, Message, WriterPid) ->
case rabbit_router:deliver(QPids, Mandatory, Immediate, Txn, Message) of
{ok, DeliveredQPids} -> DeliveredQPids;