summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-04-28 23:22:54 +0100
committerMatthias Radestock <matthias@lshift.net>2009-04-28 23:22:54 +0100
commit98d90838d5810cf99f63224bf92e715c18da4659 (patch)
tree3a075f6366737cfed86c5277190cf7cc08c24503
parente226e86c7d342ea53afae6a4db2d9d11efcca157 (diff)
downloadrabbitmq-server-98d90838d5810cf99f63224bf92e715c18da4659.tar.gz
refactoring
-rw-r--r--src/rabbit_channel.erl32
1 files changed, 14 insertions, 18 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a7e3f2bb..4a3cb285 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -306,7 +306,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory,
immediate = Immediate},
- Content, State = #ch{ virtual_host = VHostPath}) ->
+ Content, State = #ch{ virtual_host = VHostPath,
+ transaction_id = TxnKey,
+ writer_pid = WriterPid}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -317,12 +319,16 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
true -> rabbit_guid:guid();
false -> none
end,
- {noreply, publish(Exchange, Mandatory, Immediate,
+ Handled = publish(Exchange, Mandatory, Immediate, TxnKey,
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = DecodedContent,
persistent_key = PersistentKey},
- rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)};
+ WriterPid),
+ {noreply, case TxnKey of
+ none -> State;
+ _ -> add_tx_participants(Handled, State)
+ end};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
@@ -767,17 +773,10 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ok -> return_ok(State, NoWait, ReturnMethod)
end.
-publish(X, Mandatory, Immediate, Message, QPids,
- State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) ->
- Handled = deliver(X, QPids, Mandatory, Immediate, TxnKey,
- Message, WriterPid),
- case TxnKey of
- none -> State;
- _ -> add_tx_participants(Handled, State)
- end.
-
-deliver(X, QPids, Mandatory, Immediate, Txn, Message, WriterPid) ->
- case deliver(QPids, Mandatory, Immediate, Txn, Message, WriterPid) of
+publish(X, Mandatory, Immediate, Txn,
+ Message = #basic_message{routing_key = RK, content = C}, WriterPid) ->
+ case deliver(rabbit_exchange:route(X, RK, C), Mandatory, Immediate,
+ Txn, Message, WriterPid) of
[] ->
case lists:keysearch(<<"ume">>, 1, X#exchange.arguments) of
{value, {_, longstr, UmeNameBin}} ->
@@ -785,10 +784,7 @@ deliver(X, QPids, Mandatory, Immediate, Txn, Message, WriterPid) ->
UmeName = rabbit_misc:r(XName, exchange, UmeNameBin),
case rabbit_exchange:lookup(UmeName) of
{ok, Ume} ->
- #basic_message{routing_key = RK, content = C} =
- Message,
- deliver(Ume, rabbit_exchange:route(Ume, RK, C),
- false, false, Txn, Message, WriterPid);
+ publish(Ume, false, false, Txn, Message, WriterPid);
{error, not_found} ->
rabbit_log:warning(
"unroutable message exchange for ~s "