diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-04-28 23:08:41 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-04-28 23:08:41 +0100 |
commit | e226e86c7d342ea53afae6a4db2d9d11efcca157 (patch) | |
tree | c86a4bc52cbc7a77a433b3d48cf92175d7661cdc | |
parent | 83497bbd0c37ddef8b07a1c8f82311ea9264ce61 (diff) | |
download | rabbitmq-server-e226e86c7d342ea53afae6a4db2d9d11efcca157.tar.gz |
add "unroutable message exchanges" feature
-rw-r--r-- | src/rabbit_channel.erl | 34 |
1 files changed, 31 insertions, 3 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b2716ec4..a7e3f2bb 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -317,7 +317,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, true -> rabbit_guid:guid(); false -> none end, - {noreply, publish(Mandatory, Immediate, + {noreply, publish(Exchange, Mandatory, Immediate, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, @@ -767,15 +767,43 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ok -> return_ok(State, NoWait, ReturnMethod) end. -publish(Mandatory, Immediate, Message, QPids, +publish(X, Mandatory, Immediate, Message, QPids, State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) -> - Handled = deliver(QPids, Mandatory, Immediate, TxnKey, + Handled = deliver(X, QPids, Mandatory, Immediate, TxnKey, Message, WriterPid), case TxnKey of none -> State; _ -> add_tx_participants(Handled, State) end. +deliver(X, QPids, Mandatory, Immediate, Txn, Message, WriterPid) -> + case deliver(QPids, Mandatory, Immediate, Txn, Message, WriterPid) of + [] -> + case lists:keysearch(<<"ume">>, 1, X#exchange.arguments) of + {value, {_, longstr, UmeNameBin}} -> + XName = X#exchange.name, + UmeName = rabbit_misc:r(XName, exchange, UmeNameBin), + case rabbit_exchange:lookup(UmeName) of + {ok, Ume} -> + #basic_message{routing_key = RK, content = C} = + Message, + deliver(Ume, rabbit_exchange:route(Ume, RK, C), + false, false, Txn, Message, WriterPid); + {error, not_found} -> + rabbit_log:warning( + "unroutable message exchange for ~s " + "does not exist: ~s", + [rabbit_misc:rs(XName), + rabbit_misc:rs(UmeName)]), + [] + end; + false -> + [] + end; + Handled -> + Handled + end. + deliver(QPids, Mandatory, Immediate, Txn, Message, WriterPid) -> case rabbit_router:deliver(QPids, Mandatory, Immediate, Txn, Message) of {ok, DeliveredQPids} -> DeliveredQPids; |