diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-04-28 23:22:54 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-04-28 23:22:54 +0100 |
commit | 98d90838d5810cf99f63224bf92e715c18da4659 (patch) | |
tree | 3a075f6366737cfed86c5277190cf7cc08c24503 | |
parent | e226e86c7d342ea53afae6a4db2d9d11efcca157 (diff) | |
download | rabbitmq-server-98d90838d5810cf99f63224bf92e715c18da4659.tar.gz |
refactoring
-rw-r--r-- | src/rabbit_channel.erl | 32 |
1 files changed, 14 insertions, 18 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a7e3f2bb..4a3cb285 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -306,7 +306,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, immediate = Immediate}, - Content, State = #ch{ virtual_host = VHostPath}) -> + Content, State = #ch{ virtual_host = VHostPath, + transaction_id = TxnKey, + writer_pid = WriterPid}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -317,12 +319,16 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, true -> rabbit_guid:guid(); false -> none end, - {noreply, publish(Exchange, Mandatory, Immediate, + Handled = publish(Exchange, Mandatory, Immediate, TxnKey, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, persistent_key = PersistentKey}, - rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)}; + WriterPid), + {noreply, case TxnKey of + none -> State; + _ -> add_tx_participants(Handled, State) + end}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, @@ -767,17 +773,10 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ok -> return_ok(State, NoWait, ReturnMethod) end. -publish(X, Mandatory, Immediate, Message, QPids, - State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) -> - Handled = deliver(X, QPids, Mandatory, Immediate, TxnKey, - Message, WriterPid), - case TxnKey of - none -> State; - _ -> add_tx_participants(Handled, State) - end. - -deliver(X, QPids, Mandatory, Immediate, Txn, Message, WriterPid) -> - case deliver(QPids, Mandatory, Immediate, Txn, Message, WriterPid) of +publish(X, Mandatory, Immediate, Txn, + Message = #basic_message{routing_key = RK, content = C}, WriterPid) -> + case deliver(rabbit_exchange:route(X, RK, C), Mandatory, Immediate, + Txn, Message, WriterPid) of [] -> case lists:keysearch(<<"ume">>, 1, X#exchange.arguments) of {value, {_, longstr, UmeNameBin}} -> @@ -785,10 +784,7 @@ deliver(X, QPids, Mandatory, Immediate, Txn, Message, WriterPid) -> UmeName = rabbit_misc:r(XName, exchange, UmeNameBin), case rabbit_exchange:lookup(UmeName) of {ok, Ume} -> - #basic_message{routing_key = RK, content = C} = - Message, - deliver(Ume, rabbit_exchange:route(Ume, RK, C), - false, false, Txn, Message, WriterPid); + publish(Ume, false, false, Txn, Message, WriterPid); {error, not_found} -> rabbit_log:warning( "unroutable message exchange for ~s " |