diff options
-rw-r--r-- | include/rabbit.hrl | 4 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 36 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 10 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 7 |
4 files changed, 28 insertions, 29 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index bf680217..e8255346 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -66,10 +66,10 @@ -record(listener, {node, protocol, host, port}). -record(basic_message, {exchange_name, routing_key, content, guid, - is_persistent}). + is_persistent, msg_seq_no, origin}). -record(ssl_socket, {tcp, ssl}). --record(delivery, {mandatory, immediate, txn, sender, message, msg_seq_no}). +-record(delivery, {mandatory, immediate, txn, sender, message}). -record(amqp_error, {name, explanation, method = none}). -record(event, {type, props, timestamp}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4bb2109e..38eedad3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -324,7 +324,7 @@ record_current_channel_tx(ChPid, Txn) -> %% that wasn't happening already) store_ch_record((ch_record(ChPid))#cr{txn = Txn}). -deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, +deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun, ConfirmFun}, FunAcc, State = #q{q = #amqqueue{name = QName}, active_consumers = ActiveConsumers, blocked_consumers = BlockedConsumers}) -> @@ -344,13 +344,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), + ConfirmFun(Message), ChAckTags1 = case AckRequired of - true -> - rabbit_log:info("Delivered message but waiting for ack~n"), - sets:add_element(AckTag, ChAckTags); - false -> - rabbit_log:info("Delivered message and not waiting for ack~n"), - ChAckTags + true -> sets:add_element(AckTag, ChAckTags); + false -> ChAckTags end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, @@ -387,7 +384,6 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {FunAcc, State} end; {empty, _} -> - rabbit_log:info("Message on a queue without consumers~n"), {FunAcc, State} end. @@ -402,15 +398,23 @@ deliver_from_queue_deliver(AckRequired, false, {{Message, IsDelivered, AckTag}, 0 == Remaining, State #q { backing_queue_state = BQS1 }}. +confirm_function(#basic_message{msg_seq_no = MsgSeqNo, origin = ChPid}) -> + case MsgSeqNo of + undefined -> ok; + _ -> rabbit_channel:confirm(ChPid, MsgSeqNo) + end. + run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Funs = {fun deliver_from_queue_pred/2, - fun deliver_from_queue_deliver/3}, + fun deliver_from_queue_deliver/3, + fun confirm_function/1}, IsEmpty = BQ:is_empty(BQS), {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), State1. -attempt_delivery(none, ChPid, Message, State = #q{backing_queue = BQ}) -> - rabbit_log:info("Attempting delivery of message~n"), +attempt_delivery(none, ChPid, Message = #basic_message{msg_seq_no = MsgSeqNo}, + State = #q{backing_queue = BQ}) -> + rabbit_log:info("Attempting delivery of message #~p~n", [MsgSeqNo]), PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> @@ -419,10 +423,10 @@ attempt_delivery(none, ChPid, Message, State = #q{backing_queue = BQ}) -> {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, - ConfirmFun = fun() -> rabbit_channel:confirm(ChPid, -1) end, - deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + ConfirmFun = fun confirm_function/1, + deliver_msgs_to_consumers({ PredFun, DeliverFun, ConfirmFun }, false, State); +attempt_delivery(Txn, ChPid, Message, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}. @@ -433,6 +437,7 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers + rabbit_log:info("No consumers on unblocked channels; enqueueing...~n"), BQS = BQ:publish(Message, State #q.backing_queue_state), {false, NewState#q{backing_queue_state = BQS}} end. @@ -806,7 +811,6 @@ handle_cast({deliver, Txn, Message, ChPid}, State) -> handle_cast({ack, Txn, AckTags, ChPid}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - rabbit_log:info("Queue process got an ack~n"), case lookup_ch(ChPid) of not_found -> noreply(State); diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 3aa25d63..2682203a 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/4, properties/1, delivery/4, delivery/5]). +-export([publish/1, message/4, properties/1, delivery/4]). -export([publish/4, publish/7]). -export([build_content/2, from_content/1]). -export([is_message_persistent/1]). @@ -53,10 +53,6 @@ -spec(delivery/4 :: (boolean(), boolean(), rabbit_types:maybe(rabbit_type:txn()), rabbit_types:message()) -> rabiit_types:delivery()). --spec(delivery/5 :: - (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), - rabbit_types:message(), integer() | undefined) - -> rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(), binary()) @@ -97,10 +93,8 @@ publish(Delivery = #delivery{ end. delivery(Mandatory, Immediate, Txn, Message) -> - delivery(Mandatory, Immediate, Txn, Message, undefined). -delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) -> #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, - sender = self(), message = Message, msg_seq_no = MsgSeqNo}. + sender = self(), message = Message}. build_content(Properties, BodyBin) -> %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 58f03e7a..fe303261 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -490,11 +490,13 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, content = DecodedContent, guid = rabbit_guid:guid(), - is_persistent = IsPersistent}, + is_persistent = IsPersistent, + msg_seq_no = MsgSeqNo, + origin = self()}, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, MsgSeqNo)), + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), case RoutingRes of routed -> ok; unroutable -> ok = basic_return(Message, WriterPid, no_route); @@ -512,7 +514,6 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State = #ch{transaction_id = TxnKey, unacked_message_q = UAMQ}) -> - rabbit_log:info("channel received a basic.ack~n"), {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), QIncs = ack(TxnKey, Acked), Participants = [QPid || {QPid, _} <- QIncs], |