summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-02-08 18:04:57 +0000
committerEmile Joubert <emile@rabbitmq.com>2012-02-08 18:04:57 +0000
commit22871e540c28aab4cb2cab1184f6cdb83dfe100f (patch)
tree589886fdb16c2845f0f3d1d35134b87c63855f8c
parent0535af7856437bc36fd37c30521dc47bff91e644 (diff)
parent0e74226fc10d381c15eb1c4e68fa4bdd3643133f (diff)
downloadrabbitmq-server-22871e540c28aab4cb2cab1184f6cdb83dfe100f.tar.gz
Merged bug24505 into default
-rw-r--r--src/rabbit_amqqueue_process.erl55
-rw-r--r--src/rabbit_channel.erl140
-rw-r--r--src/rabbit_misc.erl25
3 files changed, 117 insertions, 103 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c4921510..5629b836 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -388,34 +388,32 @@ ch_record_state_transition(OldCR, NewCR) ->
{_, _} -> ok
end.
-deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
+deliver_msgs_to_consumers(_DeliverFun, true, State) ->
+ {true, State};
+deliver_msgs_to_consumers(DeliverFun, false,
State = #q{active_consumers = ActiveConsumers}) ->
- case PredFun(FunAcc, State) of
- false -> {FunAcc, State};
- true -> case queue:out(ActiveConsumers) of
- {empty, _} ->
- {FunAcc, State};
- {{value, QEntry}, Tail} ->
- {FunAcc1, State1} =
- deliver_msg_to_consumer(
+ case queue:out(ActiveConsumers) of
+ {empty, _} ->
+ {false, State};
+ {{value, QEntry}, Tail} ->
+ {Stop, State1} = deliver_msg_to_consumer(
DeliverFun, QEntry,
- FunAcc, State#q{active_consumers = Tail}),
- deliver_msgs_to_consumers(Funs, FunAcc1, State1)
- end
+ State#q{active_consumers = Tail}),
+ deliver_msgs_to_consumers(DeliverFun, Stop, State1)
end.
-deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, FunAcc, State) ->
+deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
C = ch_record(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
- {FunAcc, State};
+ {false, State};
false -> case rabbit_limiter:can_send(C#cr.limiter, self(),
Consumer#consumer.ack_required) of
false -> block_consumer(C#cr{is_limit_active = true}, E),
- {FunAcc, State};
+ {false, State};
true -> AC1 = queue:in(E, State#q.active_consumers),
deliver_msg_to_consumer(
- DeliverFun, Consumer, C, FunAcc,
+ DeliverFun, Consumer, C,
State#q{active_consumers = AC1})
end
end.
@@ -426,9 +424,9 @@ deliver_msg_to_consumer(DeliverFun,
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
unsent_message_count = Count},
- FunAcc, State = #q{q = #amqqueue{name = QName}}) ->
- {{Message, IsDelivered, AckTag}, FunAcc1, State1} =
- DeliverFun(AckRequired, FunAcc, State),
+ State = #q{q = #amqqueue{name = QName}}) ->
+ {{Message, IsDelivered, AckTag}, Stop, State1} =
+ DeliverFun(AckRequired, State),
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
@@ -437,11 +435,9 @@ deliver_msg_to_consumer(DeliverFun,
end,
update_ch_record(C#cr{acktags = ChAckTags1,
unsent_message_count = Count + 1}),
- {FunAcc1, State1}.
-
-deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty.
+ {Stop, State1}.
-deliver_from_queue_deliver(AckRequired, false, State) ->
+deliver_from_queue_deliver(AckRequired, State) ->
{{Message, IsDelivered, AckTag, Remaining}, State1} =
fetch(AckRequired, State),
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
@@ -485,12 +481,11 @@ maybe_record_confirm_message(_Confirm, State) ->
State.
run_message_queue(State) ->
- Funs = {fun deliver_from_queue_pred/2,
- fun deliver_from_queue_deliver/3},
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
drop_expired_messages(State),
- IsEmpty = BQ:is_empty(BQS),
- {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1),
+ {_IsEmpty1, State2} = deliver_msgs_to_consumers(
+ fun deliver_from_queue_deliver/2,
+ BQ:is_empty(BQS), State1),
State2.
attempt_delivery(Delivery = #delivery{sender = ChPid,
@@ -504,10 +499,8 @@ attempt_delivery(Delivery = #delivery{sender = ChPid,
end,
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
- PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
- fun (AckRequired, false,
- State1 = #q{backing_queue_state = BQS2}) ->
+ fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
%% we don't need an expiry here because
%% messages are not being enqueued, so we use
%% an empty message_properties.
@@ -521,7 +514,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid,
State1#q{backing_queue_state = BQS3}}
end,
{Delivered, State2} =
- deliver_msgs_to_consumers({ PredFun, DeliverFun }, false,
+ deliver_msgs_to_consumers(DeliverFun, false,
State#q{backing_queue_state = BQS1}),
{Delivered, Confirm, State2};
{Duplicate, BQS1} ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index af8d4c3a..a101886f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -33,9 +33,9 @@
-export([list_local/0]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- limiter, tx_status, next_tag,
- unacked_message_q, uncommitted_message_q, uncommitted_acks,
- user, virtual_host, most_recently_declared_queue, queue_monitors,
+ limiter, tx_status, next_tag, unacked_message_q,
+ uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
+ virtual_host, most_recently_declared_queue, queue_monitors,
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
unconfirmed_qm, confirmed, capabilities, trace_state}).
@@ -191,6 +191,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
unacked_message_q = queue:new(),
uncommitted_message_q = queue:new(),
uncommitted_acks = [],
+ uncommitted_nacks = [],
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
@@ -295,29 +296,19 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
handle_cast({deliver, ConsumerTag, AckRequired,
Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_keys = [RoutingKey | _CcRoutes],
- content = Content}}},
- State = #ch{writer_pid = WriterPid,
- next_tag = DeliveryTag,
- trace_state = TraceState}) ->
- State1 = lock_message(AckRequired,
- ack_record(DeliveryTag, ConsumerTag, Msg),
- State),
-
- M = #'basic.deliver'{consumer_tag = ConsumerTag,
- delivery_tag = DeliveryTag,
- redelivered = Redelivered,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey},
- rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
- maybe_incr_stats([{QPid, 1}], case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end, State1),
- maybe_incr_redeliver_stats(Redelivered, QPid, State1),
- rabbit_trace:tap_trace_out(Msg, TraceState),
- noreply(State1#ch{next_tag = DeliveryTag + 1});
-
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content}}},
+ State = #ch{writer_pid = WriterPid,
+ next_tag = DeliveryTag}) ->
+ ok = rabbit_writer:send_command_and_notify(
+ WriterPid, QPid, self(),
+ #'basic.deliver'{consumer_tag = ConsumerTag,
+ delivery_tag = DeliveryTag,
+ redelivered = Redelivered,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey},
+ Content),
+ noreply(record_sent(ConsumerTag, AckRequired, Msg, State));
handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
@@ -695,38 +686,28 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
- _, State = #ch{writer_pid = WriterPid,
- conn_pid = ConnPid,
- next_tag = DeliveryTag,
- trace_state = TraceState}) ->
+ _, State = #ch{writer_pid = WriterPid,
+ conn_pid = ConnPid,
+ next_tag = DeliveryTag}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, QPid, _MsgId, Redelivered,
+ Msg = {_QName, _QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_keys = [RoutingKey | _CcRoutes],
- content = Content}}} ->
- State1 = lock_message(not(NoAck),
- ack_record(DeliveryTag, none, Msg),
- State),
- maybe_incr_stats([{QPid, 1}], case NoAck of
- true -> get_no_ack;
- false -> get
- end, State1),
- maybe_incr_redeliver_stats(Redelivered, QPid, State1),
- rabbit_trace:tap_trace_out(Msg, TraceState),
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content}}} ->
ok = rabbit_writer:send_command(
WriterPid,
- #'basic.get_ok'{delivery_tag = DeliveryTag,
- redelivered = Redelivered,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey,
+ #'basic.get_ok'{delivery_tag = DeliveryTag,
+ redelivered = Redelivered,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey,
message_count = MessageCount},
Content),
- {noreply, State1#ch{next_tag = DeliveryTag + 1}};
+ {noreply, record_sent(none, not(NoAck), Msg, State)};
empty ->
{reply, #'basic.get_empty'{}, State}
end;
@@ -1085,10 +1066,15 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
-handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
- uncommitted_acks = TAL}) ->
+handle_method(#'tx.commit'{}, _,
+ State = #ch{uncommitted_message_q = TMQ,
+ uncommitted_acks = TAL,
+ uncommitted_nacks = TNL,
+ limiter = Limiter}) ->
State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
ack(TAL, State1),
+ lists:foreach(
+ fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, TNL),
{noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))};
handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
@@ -1096,8 +1082,10 @@ handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
precondition_failed, "channel is not transactional", []);
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- uncommitted_acks = TAL}) ->
- UAMQ1 = queue:from_list(lists:usort(TAL ++ queue:to_list(UAMQ))),
+ uncommitted_acks = TAL,
+ uncommitted_nacks = TNL}) ->
+ TNL1 = lists:append([L || {_, L} <- TNL]),
+ UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))),
{reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})};
handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
@@ -1261,18 +1249,46 @@ basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey},
Content).
-reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) ->
+reject(DeliveryTag, Requeue, Multiple,
+ State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
+ State1 = State#ch{unacked_message_q = Remaining},
+ {noreply,
+ case TxStatus of
+ none ->
+ reject(Requeue, Acked, State1#ch.limiter),
+ State1;
+ in_progress ->
+ State1#ch{uncommitted_nacks =
+ [{Requeue, Acked} | State1#ch.uncommitted_nacks]}
+ end}.
+
+reject(Requeue, Acked, Limiter) ->
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
end, ok, Acked),
- ok = notify_limiter(State#ch.limiter, Acked),
- {noreply, State#ch{unacked_message_q = Remaining}}.
-
-ack_record(DeliveryTag, ConsumerTag,
- _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) ->
- {DeliveryTag, ConsumerTag, {QPid, MsgId}}.
+ ok = notify_limiter(Limiter, Acked).
+
+record_sent(ConsumerTag, AckRequired,
+ Msg = {_QName, QPid, MsgId, Redelivered, _Message},
+ State = #ch{unacked_message_q = UAMQ,
+ next_tag = DeliveryTag,
+ trace_state = TraceState}) ->
+ maybe_incr_stats([{QPid, 1}], case {ConsumerTag, AckRequired} of
+ {none, true} -> get;
+ {none, false} -> get_no_ack;
+ {_ , true} -> deliver;
+ {_ , false} -> deliver_no_ack
+ end, State),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State),
+ rabbit_trace:tap_trace_out(Msg, TraceState),
+ UAMQ1 = case AckRequired of
+ true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
+ UAMQ);
+ false -> UAMQ
+ end,
+ State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
collect_acks(Q, 0, true) ->
{queue:to_list(Q), queue:new()};
@@ -1307,7 +1323,8 @@ ack(Acked, State) ->
maybe_incr_stats(QIncs, ack, State).
new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
- uncommitted_acks = []}.
+ uncommitted_acks = [],
+ uncommitted_nacks = []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1397,11 +1414,6 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
end
end, State#ch{unconfirmed_mq = UMQ1}, QPids).
-lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
- State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
-lock_message(false, _MsgStruct, State) ->
- State.
-
send_nacks([], State) ->
State;
send_nacks(MXs, State = #ch{tx_status = none}) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index f224b043..9a6879b1 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -414,16 +414,25 @@ execute_mnesia_transaction(TxFun) ->
%% Making this a sync_transaction allows us to use dirty_read
%% elsewhere and get a consistent result even when that read
%% executes on a different node.
- case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of
- {atomic, Result} -> case mnesia:is_transaction() of
- true -> ok;
- false -> mnesia_sync:sync()
- end,
- Result;
- {aborted, Reason} -> throw({error, Reason})
+ case worker_pool:submit(
+ fun () ->
+ case mnesia:is_transaction() of
+ false -> DiskLogBefore = mnesia_dumper:get_log_writes(),
+ Res = mnesia:sync_transaction(TxFun),
+ DiskLogAfter = mnesia_dumper:get_log_writes(),
+ case DiskLogAfter == DiskLogBefore of
+ true -> Res;
+ false -> {sync, Res}
+ end;
+ true -> mnesia:sync_transaction(TxFun)
+ end
+ end) of
+ {sync, {atomic, Result}} -> mnesia_sync:sync(), Result;
+ {sync, {aborted, Reason}} -> throw({error, Reason});
+ {atomic, Result} -> Result;
+ {aborted, Reason} -> throw({error, Reason})
end.
-
%% Like execute_mnesia_transaction/1 with additional Pre- and Post-
%% commit function
execute_mnesia_transaction(TxFun, PrePostCommitFun) ->