diff options
author | Emile Joubert <emile@rabbitmq.com> | 2012-02-08 18:04:57 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2012-02-08 18:04:57 +0000 |
commit | 22871e540c28aab4cb2cab1184f6cdb83dfe100f (patch) | |
tree | 589886fdb16c2845f0f3d1d35134b87c63855f8c | |
parent | 0535af7856437bc36fd37c30521dc47bff91e644 (diff) | |
parent | 0e74226fc10d381c15eb1c4e68fa4bdd3643133f (diff) | |
download | rabbitmq-server-22871e540c28aab4cb2cab1184f6cdb83dfe100f.tar.gz |
Merged bug24505 into default
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 55 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 140 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 25 |
3 files changed, 117 insertions, 103 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c4921510..5629b836 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -388,34 +388,32 @@ ch_record_state_transition(OldCR, NewCR) -> {_, _} -> ok end. -deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, +deliver_msgs_to_consumers(_DeliverFun, true, State) -> + {true, State}; +deliver_msgs_to_consumers(DeliverFun, false, State = #q{active_consumers = ActiveConsumers}) -> - case PredFun(FunAcc, State) of - false -> {FunAcc, State}; - true -> case queue:out(ActiveConsumers) of - {empty, _} -> - {FunAcc, State}; - {{value, QEntry}, Tail} -> - {FunAcc1, State1} = - deliver_msg_to_consumer( + case queue:out(ActiveConsumers) of + {empty, _} -> + {false, State}; + {{value, QEntry}, Tail} -> + {Stop, State1} = deliver_msg_to_consumer( DeliverFun, QEntry, - FunAcc, State#q{active_consumers = Tail}), - deliver_msgs_to_consumers(Funs, FunAcc1, State1) - end + State#q{active_consumers = Tail}), + deliver_msgs_to_consumers(DeliverFun, Stop, State1) end. -deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, FunAcc, State) -> +deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> C = ch_record(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), - {FunAcc, State}; + {false, State}; false -> case rabbit_limiter:can_send(C#cr.limiter, self(), Consumer#consumer.ack_required) of false -> block_consumer(C#cr{is_limit_active = true}, E), - {FunAcc, State}; + {false, State}; true -> AC1 = queue:in(E, State#q.active_consumers), deliver_msg_to_consumer( - DeliverFun, Consumer, C, FunAcc, + DeliverFun, Consumer, C, State#q{active_consumers = AC1}) end end. @@ -426,9 +424,9 @@ deliver_msg_to_consumer(DeliverFun, C = #cr{ch_pid = ChPid, acktags = ChAckTags, unsent_message_count = Count}, - FunAcc, State = #q{q = #amqqueue{name = QName}}) -> - {{Message, IsDelivered, AckTag}, FunAcc1, State1} = - DeliverFun(AckRequired, FunAcc, State), + State = #q{q = #amqqueue{name = QName}}) -> + {{Message, IsDelivered, AckTag}, Stop, State1} = + DeliverFun(AckRequired, State), rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), ChAckTags1 = case AckRequired of @@ -437,11 +435,9 @@ deliver_msg_to_consumer(DeliverFun, end, update_ch_record(C#cr{acktags = ChAckTags1, unsent_message_count = Count + 1}), - {FunAcc1, State1}. - -deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty. + {Stop, State1}. -deliver_from_queue_deliver(AckRequired, false, State) -> +deliver_from_queue_deliver(AckRequired, State) -> {{Message, IsDelivered, AckTag, Remaining}, State1} = fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. @@ -485,12 +481,11 @@ maybe_record_confirm_message(_Confirm, State) -> State. run_message_queue(State) -> - Funs = {fun deliver_from_queue_pred/2, - fun deliver_from_queue_deliver/3}, State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = drop_expired_messages(State), - IsEmpty = BQ:is_empty(BQS), - {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1), + {_IsEmpty1, State2} = deliver_msgs_to_consumers( + fun deliver_from_queue_deliver/2, + BQ:is_empty(BQS), State1), State2. attempt_delivery(Delivery = #delivery{sender = ChPid, @@ -504,10 +499,8 @@ attempt_delivery(Delivery = #delivery{sender = ChPid, end, case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> - PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = - fun (AckRequired, false, - State1 = #q{backing_queue_state = BQS2}) -> + fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> %% we don't need an expiry here because %% messages are not being enqueued, so we use %% an empty message_properties. @@ -521,7 +514,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid, State1#q{backing_queue_state = BQS3}} end, {Delivered, State2} = - deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, + deliver_msgs_to_consumers(DeliverFun, false, State#q{backing_queue_state = BQS1}), {Delivered, Confirm, State2}; {Duplicate, BQS1} -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index af8d4c3a..a101886f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -33,9 +33,9 @@ -export([list_local/0]). -record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid, - limiter, tx_status, next_tag, - unacked_message_q, uncommitted_message_q, uncommitted_acks, - user, virtual_host, most_recently_declared_queue, queue_monitors, + limiter, tx_status, next_tag, unacked_message_q, + uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user, + virtual_host, most_recently_declared_queue, queue_monitors, consumer_mapping, blocking, queue_consumers, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, unconfirmed_qm, confirmed, capabilities, trace_state}). @@ -191,6 +191,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, unacked_message_q = queue:new(), uncommitted_message_q = queue:new(), uncommitted_acks = [], + uncommitted_nacks = [], user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, @@ -295,29 +296,19 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> handle_cast({deliver, ConsumerTag, AckRequired, Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, - routing_keys = [RoutingKey | _CcRoutes], - content = Content}}}, - State = #ch{writer_pid = WriterPid, - next_tag = DeliveryTag, - trace_state = TraceState}) -> - State1 = lock_message(AckRequired, - ack_record(DeliveryTag, ConsumerTag, Msg), - State), - - M = #'basic.deliver'{consumer_tag = ConsumerTag, - delivery_tag = DeliveryTag, - redelivered = Redelivered, - exchange = ExchangeName#resource.name, - routing_key = RoutingKey}, - rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content), - maybe_incr_stats([{QPid, 1}], case AckRequired of - true -> deliver; - false -> deliver_no_ack - end, State1), - maybe_incr_redeliver_stats(Redelivered, QPid, State1), - rabbit_trace:tap_trace_out(Msg, TraceState), - noreply(State1#ch{next_tag = DeliveryTag + 1}); - + routing_keys = [RoutingKey | _CcRoutes], + content = Content}}}, + State = #ch{writer_pid = WriterPid, + next_tag = DeliveryTag}) -> + ok = rabbit_writer:send_command_and_notify( + WriterPid, QPid, self(), + #'basic.deliver'{consumer_tag = ConsumerTag, + delivery_tag = DeliveryTag, + redelivered = Redelivered, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey}, + Content), + noreply(record_sent(ConsumerTag, AckRequired, Msg, State)); handle_cast(force_event_refresh, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), @@ -695,38 +686,28 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{writer_pid = WriterPid, - conn_pid = ConnPid, - next_tag = DeliveryTag, - trace_state = TraceState}) -> + _, State = #ch{writer_pid = WriterPid, + conn_pid = ConnPid, + next_tag = DeliveryTag}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( QueueName, ConnPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, - Msg = {_QName, QPid, _MsgId, Redelivered, + Msg = {_QName, _QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, - routing_keys = [RoutingKey | _CcRoutes], - content = Content}}} -> - State1 = lock_message(not(NoAck), - ack_record(DeliveryTag, none, Msg), - State), - maybe_incr_stats([{QPid, 1}], case NoAck of - true -> get_no_ack; - false -> get - end, State1), - maybe_incr_redeliver_stats(Redelivered, QPid, State1), - rabbit_trace:tap_trace_out(Msg, TraceState), + routing_keys = [RoutingKey | _CcRoutes], + content = Content}}} -> ok = rabbit_writer:send_command( WriterPid, - #'basic.get_ok'{delivery_tag = DeliveryTag, - redelivered = Redelivered, - exchange = ExchangeName#resource.name, - routing_key = RoutingKey, + #'basic.get_ok'{delivery_tag = DeliveryTag, + redelivered = Redelivered, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey, message_count = MessageCount}, Content), - {noreply, State1#ch{next_tag = DeliveryTag + 1}}; + {noreply, record_sent(none, not(NoAck), Msg, State)}; empty -> {reply, #'basic.get_empty'{}, State} end; @@ -1085,10 +1066,15 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> rabbit_misc:protocol_error( precondition_failed, "channel is not transactional", []); -handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, - uncommitted_acks = TAL}) -> +handle_method(#'tx.commit'{}, _, + State = #ch{uncommitted_message_q = TMQ, + uncommitted_acks = TAL, + uncommitted_nacks = TNL, + limiter = Limiter}) -> State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ), ack(TAL, State1), + lists:foreach( + fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, TNL), {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))}; handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> @@ -1096,8 +1082,10 @@ handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> precondition_failed, "channel is not transactional", []); handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, - uncommitted_acks = TAL}) -> - UAMQ1 = queue:from_list(lists:usort(TAL ++ queue:to_list(UAMQ))), + uncommitted_acks = TAL, + uncommitted_nacks = TNL}) -> + TNL1 = lists:append([L || {_, L} <- TNL]), + UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))), {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})}; handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) -> @@ -1261,18 +1249,46 @@ basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey}, Content). -reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) -> +reject(DeliveryTag, Requeue, Multiple, + State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), + State1 = State#ch{unacked_message_q = Remaining}, + {noreply, + case TxStatus of + none -> + reject(Requeue, Acked, State1#ch.limiter), + State1; + in_progress -> + State1#ch{uncommitted_nacks = + [{Requeue, Acked} | State1#ch.uncommitted_nacks]} + end}. + +reject(Requeue, Acked, Limiter) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) end, ok, Acked), - ok = notify_limiter(State#ch.limiter, Acked), - {noreply, State#ch{unacked_message_q = Remaining}}. - -ack_record(DeliveryTag, ConsumerTag, - _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) -> - {DeliveryTag, ConsumerTag, {QPid, MsgId}}. + ok = notify_limiter(Limiter, Acked). + +record_sent(ConsumerTag, AckRequired, + Msg = {_QName, QPid, MsgId, Redelivered, _Message}, + State = #ch{unacked_message_q = UAMQ, + next_tag = DeliveryTag, + trace_state = TraceState}) -> + maybe_incr_stats([{QPid, 1}], case {ConsumerTag, AckRequired} of + {none, true} -> get; + {none, false} -> get_no_ack; + {_ , true} -> deliver; + {_ , false} -> deliver_no_ack + end, State), + maybe_incr_redeliver_stats(Redelivered, QPid, State), + rabbit_trace:tap_trace_out(Msg, TraceState), + UAMQ1 = case AckRequired of + true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}}, + UAMQ); + false -> UAMQ + end, + State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}. collect_acks(Q, 0, true) -> {queue:to_list(Q), queue:new()}; @@ -1307,7 +1323,8 @@ ack(Acked, State) -> maybe_incr_stats(QIncs, ack, State). new_tx(State) -> State#ch{uncommitted_message_q = queue:new(), - uncommitted_acks = []}. + uncommitted_acks = [], + uncommitted_nacks = []}. notify_queues(State = #ch{state = closing}) -> {ok, State}; @@ -1397,11 +1414,6 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> end end, State#ch{unconfirmed_mq = UMQ1}, QPids). -lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> - State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; -lock_message(false, _MsgStruct, State) -> - State. - send_nacks([], State) -> State; send_nacks(MXs, State = #ch{tx_status = none}) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index f224b043..9a6879b1 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -414,16 +414,25 @@ execute_mnesia_transaction(TxFun) -> %% Making this a sync_transaction allows us to use dirty_read %% elsewhere and get a consistent result even when that read %% executes on a different node. - case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of - {atomic, Result} -> case mnesia:is_transaction() of - true -> ok; - false -> mnesia_sync:sync() - end, - Result; - {aborted, Reason} -> throw({error, Reason}) + case worker_pool:submit( + fun () -> + case mnesia:is_transaction() of + false -> DiskLogBefore = mnesia_dumper:get_log_writes(), + Res = mnesia:sync_transaction(TxFun), + DiskLogAfter = mnesia_dumper:get_log_writes(), + case DiskLogAfter == DiskLogBefore of + true -> Res; + false -> {sync, Res} + end; + true -> mnesia:sync_transaction(TxFun) + end + end) of + {sync, {atomic, Result}} -> mnesia_sync:sync(), Result; + {sync, {aborted, Reason}} -> throw({error, Reason}); + {atomic, Result} -> Result; + {aborted, Reason} -> throw({error, Reason}) end. - %% Like execute_mnesia_transaction/1 with additional Pre- and Post- %% commit function execute_mnesia_transaction(TxFun, PrePostCommitFun) -> |