summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-02-07 21:10:05 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-02-07 21:10:05 +0000
commit8e61223c2e104ba2813aacf462e8406d8e7e66fa (patch)
treeedfb3cecd173e948a6b922fd27ef2664473f2bfb
parent2fdee97e0f1cad03faf1827eb6f43ec78c1d3e1f (diff)
downloadrabbitmq-server-8e61223c2e104ba2813aacf462e8406d8e7e66fa.tar.gz
refactor: extract commonality between basic.deliver and basic.get
...and move the writer communication as early as possible, which improves latency.
-rw-r--r--src/rabbit_channel.erl95
1 files changed, 43 insertions, 52 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f17f98ca..76fbc73c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -295,29 +295,19 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
handle_cast({deliver, ConsumerTag, AckRequired,
Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_keys = [RoutingKey | _CcRoutes],
- content = Content}}},
- State = #ch{writer_pid = WriterPid,
- next_tag = DeliveryTag,
- trace_state = TraceState}) ->
- State1 = lock_message(AckRequired,
- ack_record(DeliveryTag, ConsumerTag, Msg),
- State),
-
- M = #'basic.deliver'{consumer_tag = ConsumerTag,
- delivery_tag = DeliveryTag,
- redelivered = Redelivered,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey},
- rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
- maybe_incr_stats([{QPid, 1}], case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end, State1),
- maybe_incr_redeliver_stats(Redelivered, QPid, State1),
- rabbit_trace:tap_trace_out(Msg, TraceState),
- noreply(State1#ch{next_tag = DeliveryTag + 1});
-
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content}}},
+ State = #ch{writer_pid = WriterPid,
+ next_tag = DeliveryTag}) ->
+ ok = rabbit_writer:send_command_and_notify(
+ WriterPid, QPid, self(),
+ #'basic.deliver'{consumer_tag = ConsumerTag,
+ delivery_tag = DeliveryTag,
+ redelivered = Redelivered,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey},
+ Content),
+ noreply(record_sent(ConsumerTag, AckRequired, Msg, State));
handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
@@ -695,38 +685,28 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
- _, State = #ch{writer_pid = WriterPid,
- conn_pid = ConnPid,
- next_tag = DeliveryTag,
- trace_state = TraceState}) ->
+ _, State = #ch{writer_pid = WriterPid,
+ conn_pid = ConnPid,
+ next_tag = DeliveryTag}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, QPid, _MsgId, Redelivered,
+ Msg = {_QName, _QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_keys = [RoutingKey | _CcRoutes],
- content = Content}}} ->
- State1 = lock_message(not(NoAck),
- ack_record(DeliveryTag, none, Msg),
- State),
- maybe_incr_stats([{QPid, 1}], case NoAck of
- true -> get_no_ack;
- false -> get
- end, State1),
- maybe_incr_redeliver_stats(Redelivered, QPid, State1),
- rabbit_trace:tap_trace_out(Msg, TraceState),
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content}}} ->
ok = rabbit_writer:send_command(
WriterPid,
- #'basic.get_ok'{delivery_tag = DeliveryTag,
- redelivered = Redelivered,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey,
+ #'basic.get_ok'{delivery_tag = DeliveryTag,
+ redelivered = Redelivered,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey,
message_count = MessageCount},
Content),
- {noreply, State1#ch{next_tag = DeliveryTag + 1}};
+ {noreply, record_sent(none, not(NoAck), Msg, State)};
empty ->
{reply, #'basic.get_empty'{}, State}
end;
@@ -1268,9 +1248,25 @@ reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) ->
ok = notify_limiter(State#ch.limiter, Acked),
{noreply, State#ch{unacked_message_q = Remaining}}.
-ack_record(DeliveryTag, ConsumerTag,
- _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) ->
- {DeliveryTag, ConsumerTag, {QPid, MsgId}}.
+record_sent(ConsumerTag, AckRequired,
+ Msg = {_QName, QPid, MsgId, Redelivered, _Message},
+ State = #ch{unacked_message_q = UAMQ,
+ next_tag = DeliveryTag,
+ trace_state = TraceState}) ->
+ maybe_incr_stats([{QPid, 1}], case {ConsumerTag, AckRequired} of
+ {none, true} -> get;
+ {none, false} -> get_no_ack;
+ {_ , true} -> deliver;
+ {_ , false} -> deliver_no_ack
+ end, State),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State),
+ rabbit_trace:tap_trace_out(Msg, TraceState),
+ UAMQ1 = case AckRequired of
+ true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
+ UAMQ);
+ false -> UAMQ
+ end,
+ State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
collect_acks(Q, 0, true) ->
{queue:to_list(Q), queue:new()};
@@ -1395,11 +1391,6 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
end
end, State#ch{unconfirmed_mq = UMQ1}, QPids).
-lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
- State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
-lock_message(false, _MsgStruct, State) ->
- State.
-
send_nacks([], State) ->
State;
send_nacks(MXs, State = #ch{tx_status = none}) ->