summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit.hrl4
-rw-r--r--src/rabbit_amqqueue_process.erl36
-rw-r--r--src/rabbit_basic.erl10
-rw-r--r--src/rabbit_channel.erl7
4 files changed, 28 insertions, 29 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index bf680217..e8255346 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -66,10 +66,10 @@
-record(listener, {node, protocol, host, port}).
-record(basic_message, {exchange_name, routing_key, content, guid,
- is_persistent}).
+ is_persistent, msg_seq_no, origin}).
-record(ssl_socket, {tcp, ssl}).
--record(delivery, {mandatory, immediate, txn, sender, message, msg_seq_no}).
+-record(delivery, {mandatory, immediate, txn, sender, message}).
-record(amqp_error, {name, explanation, method = none}).
-record(event, {type, props, timestamp}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4bb2109e..38eedad3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -324,7 +324,7 @@ record_current_channel_tx(ChPid, Txn) ->
%% that wasn't happening already)
store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
-deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
+deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun, ConfirmFun}, FunAcc,
State = #q{q = #amqqueue{name = QName},
active_consumers = ActiveConsumers,
blocked_consumers = BlockedConsumers}) ->
@@ -344,13 +344,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
+ ConfirmFun(Message),
ChAckTags1 = case AckRequired of
- true ->
- rabbit_log:info("Delivered message but waiting for ack~n"),
- sets:add_element(AckTag, ChAckTags);
- false ->
- rabbit_log:info("Delivered message and not waiting for ack~n"),
- ChAckTags
+ true -> sets:add_element(AckTag, ChAckTags);
+ false -> ChAckTags
end,
NewC = C#cr{unsent_message_count = Count + 1,
acktags = ChAckTags1},
@@ -387,7 +384,6 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{FunAcc, State}
end;
{empty, _} ->
- rabbit_log:info("Message on a queue without consumers~n"),
{FunAcc, State}
end.
@@ -402,15 +398,23 @@ deliver_from_queue_deliver(AckRequired, false,
{{Message, IsDelivered, AckTag}, 0 == Remaining,
State #q { backing_queue_state = BQS1 }}.
+confirm_function(#basic_message{msg_seq_no = MsgSeqNo, origin = ChPid}) ->
+ case MsgSeqNo of
+ undefined -> ok;
+ _ -> rabbit_channel:confirm(ChPid, MsgSeqNo)
+ end.
+
run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Funs = {fun deliver_from_queue_pred/2,
- fun deliver_from_queue_deliver/3},
+ fun deliver_from_queue_deliver/3,
+ fun confirm_function/1},
IsEmpty = BQ:is_empty(BQS),
{_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State),
State1.
-attempt_delivery(none, ChPid, Message, State = #q{backing_queue = BQ}) ->
- rabbit_log:info("Attempting delivery of message~n"),
+attempt_delivery(none, ChPid, Message = #basic_message{msg_seq_no = MsgSeqNo},
+ State = #q{backing_queue = BQ}) ->
+ rabbit_log:info("Attempting delivery of message #~p~n", [MsgSeqNo]),
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
@@ -419,10 +423,10 @@ attempt_delivery(none, ChPid, Message, State = #q{backing_queue = BQ}) ->
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
- ConfirmFun = fun() -> rabbit_channel:confirm(ChPid, -1) end,
- deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
-attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ ConfirmFun = fun confirm_function/1,
+ deliver_msgs_to_consumers({ PredFun, DeliverFun, ConfirmFun }, false, State);
+attempt_delivery(Txn, ChPid, Message,
+ State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
record_current_channel_tx(ChPid, Txn),
{true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}.
@@ -433,6 +437,7 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
+ rabbit_log:info("No consumers on unblocked channels; enqueueing...~n"),
BQS = BQ:publish(Message, State #q.backing_queue_state),
{false, NewState#q{backing_queue_state = BQS}}
end.
@@ -806,7 +811,6 @@ handle_cast({deliver, Txn, Message, ChPid}, State) ->
handle_cast({ack, Txn, AckTags, ChPid},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
- rabbit_log:info("Queue process got an ack~n"),
case lookup_ch(ChPid) of
not_found ->
noreply(State);
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 3aa25d63..2682203a 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/1, message/4, properties/1, delivery/4, delivery/5]).
+-export([publish/1, message/4, properties/1, delivery/4]).
-export([publish/4, publish/7]).
-export([build_content/2, from_content/1]).
-export([is_message_persistent/1]).
@@ -53,10 +53,6 @@
-spec(delivery/4 ::
(boolean(), boolean(), rabbit_types:maybe(rabbit_type:txn()),
rabbit_types:message()) -> rabiit_types:delivery()).
--spec(delivery/5 ::
- (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
- rabbit_types:message(), integer() | undefined)
- -> rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
properties_input(), binary())
@@ -97,10 +93,8 @@ publish(Delivery = #delivery{
end.
delivery(Mandatory, Immediate, Txn, Message) ->
- delivery(Mandatory, Immediate, Txn, Message, undefined).
-delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) ->
#delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
- sender = self(), message = Message, msg_seq_no = MsgSeqNo}.
+ sender = self(), message = Message}.
build_content(Properties, BodyBin) ->
%% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 58f03e7a..fe303261 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -490,11 +490,13 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
content = DecodedContent,
guid = rabbit_guid:guid(),
- is_persistent = IsPersistent},
+ is_persistent = IsPersistent,
+ msg_seq_no = MsgSeqNo,
+ origin = self()},
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
- rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, MsgSeqNo)),
+ rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
case RoutingRes of
routed -> ok;
unroutable -> ok = basic_return(Message, WriterPid, no_route);
@@ -512,7 +514,6 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
_, State = #ch{transaction_id = TxnKey,
unacked_message_q = UAMQ}) ->
- rabbit_log:info("channel received a basic.ack~n"),
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
QIncs = ack(TxnKey, Acked),
Participants = [QPid || {QPid, _} <- QIncs],