diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-02-07 21:10:05 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-02-07 21:10:05 +0000 |
commit | 8e61223c2e104ba2813aacf462e8406d8e7e66fa (patch) | |
tree | edfb3cecd173e948a6b922fd27ef2664473f2bfb | |
parent | 2fdee97e0f1cad03faf1827eb6f43ec78c1d3e1f (diff) | |
download | rabbitmq-server-8e61223c2e104ba2813aacf462e8406d8e7e66fa.tar.gz |
refactor: extract commonality between basic.deliver and basic.get
...and move the writer communication as early as possible, which
improves latency.
-rw-r--r-- | src/rabbit_channel.erl | 95 |
1 files changed, 43 insertions, 52 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f17f98ca..76fbc73c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -295,29 +295,19 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> handle_cast({deliver, ConsumerTag, AckRequired, Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, - routing_keys = [RoutingKey | _CcRoutes], - content = Content}}}, - State = #ch{writer_pid = WriterPid, - next_tag = DeliveryTag, - trace_state = TraceState}) -> - State1 = lock_message(AckRequired, - ack_record(DeliveryTag, ConsumerTag, Msg), - State), - - M = #'basic.deliver'{consumer_tag = ConsumerTag, - delivery_tag = DeliveryTag, - redelivered = Redelivered, - exchange = ExchangeName#resource.name, - routing_key = RoutingKey}, - rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content), - maybe_incr_stats([{QPid, 1}], case AckRequired of - true -> deliver; - false -> deliver_no_ack - end, State1), - maybe_incr_redeliver_stats(Redelivered, QPid, State1), - rabbit_trace:tap_trace_out(Msg, TraceState), - noreply(State1#ch{next_tag = DeliveryTag + 1}); - + routing_keys = [RoutingKey | _CcRoutes], + content = Content}}}, + State = #ch{writer_pid = WriterPid, + next_tag = DeliveryTag}) -> + ok = rabbit_writer:send_command_and_notify( + WriterPid, QPid, self(), + #'basic.deliver'{consumer_tag = ConsumerTag, + delivery_tag = DeliveryTag, + redelivered = Redelivered, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey}, + Content), + noreply(record_sent(ConsumerTag, AckRequired, Msg, State)); handle_cast(force_event_refresh, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), @@ -695,38 +685,28 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{writer_pid = WriterPid, - conn_pid = ConnPid, - next_tag = DeliveryTag, - trace_state = TraceState}) -> + _, State = #ch{writer_pid = WriterPid, + conn_pid = ConnPid, + next_tag = DeliveryTag}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, - Msg = {_QName, QPid, _MsgId, Redelivered, + Msg = {_QName, _QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, - routing_keys = [RoutingKey | _CcRoutes], - content = Content}}} -> - State1 = lock_message(not(NoAck), - ack_record(DeliveryTag, none, Msg), - State), - maybe_incr_stats([{QPid, 1}], case NoAck of - true -> get_no_ack; - false -> get - end, State1), - maybe_incr_redeliver_stats(Redelivered, QPid, State1), - rabbit_trace:tap_trace_out(Msg, TraceState), + routing_keys = [RoutingKey | _CcRoutes], + content = Content}}} -> ok = rabbit_writer:send_command( WriterPid, - #'basic.get_ok'{delivery_tag = DeliveryTag, - redelivered = Redelivered, - exchange = ExchangeName#resource.name, - routing_key = RoutingKey, + #'basic.get_ok'{delivery_tag = DeliveryTag, + redelivered = Redelivered, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey, message_count = MessageCount}, Content), - {noreply, State1#ch{next_tag = DeliveryTag + 1}}; + {noreply, record_sent(none, not(NoAck), Msg, State)}; empty -> {reply, #'basic.get_empty'{}, State} end; @@ -1268,9 +1248,25 @@ reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) -> ok = notify_limiter(State#ch.limiter, Acked), {noreply, State#ch{unacked_message_q = Remaining}}. -ack_record(DeliveryTag, ConsumerTag, - _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) -> - {DeliveryTag, ConsumerTag, {QPid, MsgId}}. +record_sent(ConsumerTag, AckRequired, + Msg = {_QName, QPid, MsgId, Redelivered, _Message}, + State = #ch{unacked_message_q = UAMQ, + next_tag = DeliveryTag, + trace_state = TraceState}) -> + maybe_incr_stats([{QPid, 1}], case {ConsumerTag, AckRequired} of + {none, true} -> get; + {none, false} -> get_no_ack; + {_ , true} -> deliver; + {_ , false} -> deliver_no_ack + end, State), + maybe_incr_redeliver_stats(Redelivered, QPid, State), + rabbit_trace:tap_trace_out(Msg, TraceState), + UAMQ1 = case AckRequired of + true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}}, + UAMQ); + false -> UAMQ + end, + State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}. collect_acks(Q, 0, true) -> {queue:to_list(Q), queue:new()}; @@ -1395,11 +1391,6 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> end end, State#ch{unconfirmed_mq = UMQ1}, QPids). -lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> - State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; -lock_message(false, _MsgStruct, State) -> - State. - send_nacks([], State) -> State; send_nacks(MXs, State = #ch{tx_status = none}) -> |