From d34d84aa046ffb30ed50c98596ad071f773bde07 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Sun, 28 Nov 2010 11:15:20 +0000 Subject: experimental single-frame publish/deliver commands --- src/rabbit_channel.erl | 50 +++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 45 insertions(+), 5 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 6bed63a3..0cbfa9a1 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -474,6 +474,44 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, _ -> add_tx_participants(DeliveredQPids, State) end}; +handle_method(#'basic.send'{exchange = ExchangeNameBin, + routing_key = RoutingKey, + mandatory = Mandatory, + immediate = Immediate, + content = Payload}, + _, State = #ch{virtual_host = VHostPath, + transaction_id = TxnKey, + writer_pid = WriterPid}) -> + Content = rabbit_basic:build_content(#'P_basic'{}, Payload), + ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_write_permitted(ExchangeName, State), + Exchange = rabbit_exchange:lookup_or_die(ExchangeName), + %% We decode the content's properties here because we're almost + %% certain to want to look at delivery-mode and priority. + DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), + IsPersistent = is_message_persistent(DecodedContent), + Message = #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey, + content = DecodedContent, + guid = rabbit_guid:guid(), + is_persistent = IsPersistent}, + {RoutingRes, DeliveredQPids} = + rabbit_exchange:publish( + Exchange, + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), + case RoutingRes of + routed -> ok; + unroutable -> ok = basic_return(Message, WriterPid, no_route); + not_delivered -> ok = basic_return(Message, WriterPid, no_consumers) + end, + maybe_incr_stats([{ExchangeName, 1} | + [{{QPid, ExchangeName}, 1} || + QPid <- DeliveredQPids]], publish, State), + {noreply, case TxnKey of + none -> State; + _ -> add_tx_participants(DeliveredQPids, State) + end}; + handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State = #ch{transaction_id = TxnKey, @@ -1116,16 +1154,18 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, - content = Content}}) -> - M = #'basic.deliver'{consumer_tag = ConsumerTag, + content = #content{ + payload_fragments_rev = [Payload]}}}) -> + M = #'basic.receive'{consumer_tag = ConsumerTag, delivery_tag = DeliveryTag, redelivered = Redelivered, exchange = ExchangeName#resource.name, - routing_key = RoutingKey}, + routing_key = RoutingKey, + content = Payload}, ok = case Notify of true -> rabbit_writer:send_command_and_notify( - WriterPid, QPid, self(), M, Content); - false -> rabbit_writer:send_command(WriterPid, M, Content) + WriterPid, QPid, self(), M); + false -> rabbit_writer:send_command(WriterPid, M) end. terminate(_State) -> -- cgit v1.2.1