diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-04 10:48:21 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-04 10:48:21 +0100 |
commit | 171d7882d6cc7992b539a8eb1b629d7a84d31ba5 (patch) | |
tree | 4b263d4f54d523a7c194f2e3fff79f285bce4597 | |
parent | 78377bc08a5674a8e051caa17ecfb72c8a12eaa3 (diff) | |
download | rabbitmq-server-171d7882d6cc7992b539a8eb1b629d7a84d31ba5.tar.gz |
do_if_not_dup -> do_if_unconfirmed; bugfixes
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 26 |
2 files changed, 14 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5b31c8ce..f1085a0c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -717,7 +717,7 @@ handle_call({deliver_immediately, Delivery = #delivery{message = Msg}} State1 = record_confirm_message(Delivery, State), {Delivered, State2} = attempt_delivery(Delivery, State1), State3 = case Delivered of - true -> State2, + true -> State2; false -> confirm_message(Msg#basic_message.guid, State2) end, reply(Delivered, State3); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a755b073..d641e824 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -452,18 +452,18 @@ send_or_enqueue_ack(undefined, State) -> send_or_enqueue_ack(_, State = #ch{confirm_enabled = false}) -> State; send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = false}) -> - do_if_not_dup(MsgSeqNo, State, - fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> - ok = rabbit_writer:send_command( - WriterPid, #'basic.ack'{delivery_tag = MSN}), - State1 - end); + do_if_unconfirmed(MsgSeqNo, State, + fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command( + WriterPid, #'basic.ack'{delivery_tag = MSN}), + State1 + end); send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm_multiple = true}) -> - do_if_not_dup(MsgSeqNo, State, - fun(MSN, State1 = #ch{held_confirms = As}) -> - start_ack_timer(State1#ch{held_confirms = - gb_sets:add(MSN, As)}) - end). + do_if_unconfirmed(MsgSeqNo, State, + fun(MSN, State1 = #ch{held_confirms = As}) -> + start_ack_timer(State1#ch{held_confirms = + gb_sets:add(MSN, As)}) + end). msg_sent_to_queue(undefined, _QPid, State) -> State; @@ -473,9 +473,9 @@ msg_sent_to_queue(MsgSeqNo, QPid, State = #ch{qpid_to_msgs = QTM}) -> error -> erlang:monitor(process, QPid), gb_sets:new() end, - State#ch{qpid_to_msgs = dict:store(QPid, gb_sets:add(MsgSeqNo, Msgs1))}. + State#ch{qpid_to_msgs = dict:store(QPid, gb_sets:add(MsgSeqNo, Msgs1), QTM)}. -do_if_not_dup(MsgSeqNo, State = #ch{unconfirmed = UC}, Fun) -> +do_if_unconfirmed(MsgSeqNo, State = #ch{unconfirmed = UC}, Fun) -> case gb_sets:is_element(MsgSeqNo, UC) of true -> QTM = dict:map(fun (_, Msgs) -> gb_sets:delete_any(MsgSeqNo, Msgs) |