summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-04 10:48:21 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-04 10:48:21 +0100
commit171d7882d6cc7992b539a8eb1b629d7a84d31ba5 (patch)
tree4b263d4f54d523a7c194f2e3fff79f285bce4597
parent78377bc08a5674a8e051caa17ecfb72c8a12eaa3 (diff)
downloadrabbitmq-server-171d7882d6cc7992b539a8eb1b629d7a84d31ba5.tar.gz
do_if_not_dup -> do_if_unconfirmed; bugfixes
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_channel.erl26
2 files changed, 14 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5b31c8ce..f1085a0c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -717,7 +717,7 @@ handle_call({deliver_immediately, Delivery = #delivery{message = Msg}}
State1 = record_confirm_message(Delivery, State),
{Delivered, State2} = attempt_delivery(Delivery, State1),
State3 = case Delivered of
- true -> State2,
+ true -> State2;
false -> confirm_message(Msg#basic_message.guid, State2)
end,
reply(Delivered, State3);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a755b073..d641e824 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -452,18 +452,18 @@ send_or_enqueue_ack(undefined, State) ->
send_or_enqueue_ack(_, State = #ch{confirm_enabled = false}) ->
State;
send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = false}) ->
- do_if_not_dup(MsgSeqNo, State,
- fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(
- WriterPid, #'basic.ack'{delivery_tag = MSN}),
- State1
- end);
+ do_if_unconfirmed(MsgSeqNo, State,
+ fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(
+ WriterPid, #'basic.ack'{delivery_tag = MSN}),
+ State1
+ end);
send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) ->
- do_if_not_dup(MsgSeqNo, State,
- fun(MSN, State1 = #ch{held_confirms = As}) ->
- start_ack_timer(State1#ch{held_confirms =
- gb_sets:add(MSN, As)})
- end).
+ do_if_unconfirmed(MsgSeqNo, State,
+ fun(MSN, State1 = #ch{held_confirms = As}) ->
+ start_ack_timer(State1#ch{held_confirms =
+ gb_sets:add(MSN, As)})
+ end).
msg_sent_to_queue(undefined, _QPid, State) ->
State;
@@ -473,9 +473,9 @@ msg_sent_to_queue(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) ->
error -> erlang:monitor(process, QPid),
gb_sets:new()
end,
- State#ch{qpid_to_msgs = dict:store(QPid, gb_sets:add(MsgSeqNo, Msgs1))}.
+ State#ch{qpid_to_msgs = dict:store(QPid, gb_sets:add(MsgSeqNo, Msgs1), QTM)}.
-do_if_not_dup(MsgSeqNo, State = #ch{unconfirmed = UC}, Fun) ->
+do_if_unconfirmed(MsgSeqNo, State = #ch{unconfirmed = UC}, Fun) ->
case gb_sets:is_element(MsgSeqNo, UC) of
true -> QTM = dict:map(fun (_, Msgs) ->
gb_sets:delete_any(MsgSeqNo, Msgs)