diff options
-rw-r--r-- | src/rabbit_channel.erl | 44 |
1 files changed, 18 insertions, 26 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dae0a96e..6abca523 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -40,7 +40,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --record(ch, {state, proxy_pid, reader_pid, writer_pid, +-record(ch, {state, reader_pid, writer_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, @@ -92,7 +92,6 @@ init([ReaderPid, WriterPid, Username, VHost]) -> link(WriterPid), rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), {ok, #ch{state = starting, - proxy_pid = self(), reader_pid = ReaderPid, writer_pid = WriterPid, transaction_id = none, @@ -135,12 +134,10 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> {noreply, State}; handle_cast({deliver, ConsumerTag, AckRequired, Msg}, - State = #ch{proxy_pid = ProxyPid, - writer_pid = WriterPid, + State = #ch{writer_pid = WriterPid, next_tag = DeliveryTag}) -> State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), - ok = internal_deliver(WriterPid, ProxyPid, - true, ConsumerTag, DeliveryTag, Msg), + ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), {noreply, State1#ch{next_tag = DeliveryTag + 1}}; handle_cast({conserve_memory, Conserve}, State) -> @@ -284,7 +281,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, true -> ok end, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), - Participants = ack(State#ch.proxy_pid, TxnKey, Acked), + Participants = ack(TxnKey, Acked), {noreply, case TxnKey of none -> State#ch{unacked_message_q = Remaining}; _ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q, @@ -297,12 +294,12 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{ proxy_pid = ProxyPid, writer_pid = WriterPid, + _, State = #ch{ writer_pid = WriterPid, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), case rabbit_amqqueue:with_or_die( QueueName, - fun (Q) -> rabbit_amqqueue:basic_get(Q, ProxyPid, NoAck) end) of + fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, Msg = {_QName, _QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, @@ -328,8 +325,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_ack = NoAck, exclusive = ExclusiveConsume, nowait = NoWait}, - _, State = #ch{ proxy_pid = ProxyPid, - reader_pid = ReaderPid, + _, State = #ch{ reader_pid = ReaderPid, consumer_mapping = ConsumerMapping }) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -347,7 +343,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, QueueName, fun (Q) -> rabbit_amqqueue:basic_consume( - Q, NoAck, ReaderPid, ProxyPid, + Q, NoAck, ReaderPid, self(), ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})) @@ -378,8 +374,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, - _, State = #ch{ proxy_pid = ProxyPid, - consumer_mapping = ConsumerMapping }) -> + _, State = #ch{consumer_mapping = ConsumerMapping }) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -400,7 +395,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, %% cancel_ok ourselves it might overtake a %% message sent previously by the queue. rabbit_amqqueue:basic_cancel( - Q, ProxyPid, ConsumerTag, + Q, self(), ConsumerTag, ok_msg(NoWait, #'basic.cancel_ok'{ consumer_tag = ConsumerTag})) end) of @@ -418,7 +413,6 @@ handle_method(#'basic.qos'{}, _, State) -> handle_method(#'basic.recover'{requeue = true}, _, State = #ch{ transaction_id = none, - proxy_pid = ProxyPid, unacked_message_q = UAMQ }) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> @@ -427,14 +421,13 @@ handle_method(#'basic.recover'{requeue = true}, %% order. To keep it happy we reverse the id list %% since we are given them in reverse order. rabbit_amqqueue:requeue( - QPid, lists:reverse(MsgIds), ProxyPid) + QPid, lists:reverse(MsgIds), self()) end, ok, UAMQ), %% No answer required, apparently! {noreply, State#ch{unacked_message_q = queue:new()}}; handle_method(#'basic.recover'{requeue = false}, _, State = #ch{ transaction_id = none, - proxy_pid = ProxyPid, writer_pid = WriterPid, unacked_message_q = UAMQ }) -> lists:foreach( @@ -452,8 +445,7 @@ handle_method(#'basic.recover'{requeue = false}, %% %% FIXME: should we allocate a fresh DeliveryTag? ok = internal_deliver( - WriterPid, ProxyPid, - false, ConsumerTag, DeliveryTag, + WriterPid, false, ConsumerTag, DeliveryTag, {QName, QPid, MsgId, true, Message}) end, queue:to_list(UAMQ)), %% No answer required, apparently! @@ -742,10 +734,10 @@ add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) -> State#ch{tx_participants = sets:union(Participants, sets:from_list(MoreP))}. -ack(ProxyPid, TxnKey, UAQ) -> +ack(TxnKey, UAQ) -> fold_per_queue( fun (QPid, MsgIds, L) -> - ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, ProxyPid), + ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()), [QPid | L] end, [], UAQ). @@ -797,7 +789,7 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). -notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> +notify_queues(#ch{consumer_mapping = Consumers}) -> rabbit_amqqueue:notify_down_all( [QPid || QueueName <- sets:to_list( @@ -809,7 +801,7 @@ notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> %% queue has been deleted in the meantime {error, not_found} -> QPid = none, false end], - ProxyPid). + self()). is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> @@ -827,7 +819,7 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. -internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag, +internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, @@ -839,6 +831,6 @@ internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag, routing_key = RoutingKey}, ok = case Notify of true -> rabbit_writer:send_command_and_notify( - WriterPid, QPid, ChPid, M, Content); + WriterPid, QPid, self(), M, Content); false -> rabbit_writer:send_command(WriterPid, M, Content) end. |