summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl44
1 files changed, 18 insertions, 26 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dae0a96e..6abca523 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -40,7 +40,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
--record(ch, {state, proxy_pid, reader_pid, writer_pid,
+-record(ch, {state, reader_pid, writer_pid,
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host,
@@ -92,7 +92,6 @@ init([ReaderPid, WriterPid, Username, VHost]) ->
link(WriterPid),
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
{ok, #ch{state = starting,
- proxy_pid = self(),
reader_pid = ReaderPid,
writer_pid = WriterPid,
transaction_id = none,
@@ -135,12 +134,10 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
{noreply, State};
handle_cast({deliver, ConsumerTag, AckRequired, Msg},
- State = #ch{proxy_pid = ProxyPid,
- writer_pid = WriterPid,
+ State = #ch{writer_pid = WriterPid,
next_tag = DeliveryTag}) ->
State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State),
- ok = internal_deliver(WriterPid, ProxyPid,
- true, ConsumerTag, DeliveryTag, Msg),
+ ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
{noreply, State1#ch{next_tag = DeliveryTag + 1}};
handle_cast({conserve_memory, Conserve}, State) ->
@@ -284,7 +281,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
true -> ok
end,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
- Participants = ack(State#ch.proxy_pid, TxnKey, Acked),
+ Participants = ack(TxnKey, Acked),
{noreply, case TxnKey of
none -> State#ch{unacked_message_q = Remaining};
_ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q,
@@ -297,12 +294,12 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
- _, State = #ch{ proxy_pid = ProxyPid, writer_pid = WriterPid,
+ _, State = #ch{ writer_pid = WriterPid,
next_tag = DeliveryTag }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
case rabbit_amqqueue:with_or_die(
QueueName,
- fun (Q) -> rabbit_amqqueue:basic_get(Q, ProxyPid, NoAck) end) of
+ fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
Msg = {_QName, _QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
@@ -328,8 +325,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait},
- _, State = #ch{ proxy_pid = ProxyPid,
- reader_pid = ReaderPid,
+ _, State = #ch{ reader_pid = ReaderPid,
consumer_mapping = ConsumerMapping }) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
@@ -347,7 +343,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
QueueName,
fun (Q) ->
rabbit_amqqueue:basic_consume(
- Q, NoAck, ReaderPid, ProxyPid,
+ Q, NoAck, ReaderPid, self(),
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag}))
@@ -378,8 +374,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
nowait = NoWait},
- _, State = #ch{ proxy_pid = ProxyPid,
- consumer_mapping = ConsumerMapping }) ->
+ _, State = #ch{consumer_mapping = ConsumerMapping }) ->
OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
@@ -400,7 +395,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
%% cancel_ok ourselves it might overtake a
%% message sent previously by the queue.
rabbit_amqqueue:basic_cancel(
- Q, ProxyPid, ConsumerTag,
+ Q, self(), ConsumerTag,
ok_msg(NoWait, #'basic.cancel_ok'{
consumer_tag = ConsumerTag}))
end) of
@@ -418,7 +413,6 @@ handle_method(#'basic.qos'{}, _, State) ->
handle_method(#'basic.recover'{requeue = true},
_, State = #ch{ transaction_id = none,
- proxy_pid = ProxyPid,
unacked_message_q = UAMQ }) ->
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
@@ -427,14 +421,13 @@ handle_method(#'basic.recover'{requeue = true},
%% order. To keep it happy we reverse the id list
%% since we are given them in reverse order.
rabbit_amqqueue:requeue(
- QPid, lists:reverse(MsgIds), ProxyPid)
+ QPid, lists:reverse(MsgIds), self())
end, ok, UAMQ),
%% No answer required, apparently!
{noreply, State#ch{unacked_message_q = queue:new()}};
handle_method(#'basic.recover'{requeue = false},
_, State = #ch{ transaction_id = none,
- proxy_pid = ProxyPid,
writer_pid = WriterPid,
unacked_message_q = UAMQ }) ->
lists:foreach(
@@ -452,8 +445,7 @@ handle_method(#'basic.recover'{requeue = false},
%%
%% FIXME: should we allocate a fresh DeliveryTag?
ok = internal_deliver(
- WriterPid, ProxyPid,
- false, ConsumerTag, DeliveryTag,
+ WriterPid, false, ConsumerTag, DeliveryTag,
{QName, QPid, MsgId, true, Message})
end, queue:to_list(UAMQ)),
%% No answer required, apparently!
@@ -742,10 +734,10 @@ add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) ->
State#ch{tx_participants = sets:union(Participants,
sets:from_list(MoreP))}.
-ack(ProxyPid, TxnKey, UAQ) ->
+ack(TxnKey, UAQ) ->
fold_per_queue(
fun (QPid, MsgIds, L) ->
- ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, ProxyPid),
+ ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()),
[QPid | L]
end, [], UAQ).
@@ -797,7 +789,7 @@ fold_per_queue(F, Acc0, UAQ) ->
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
-notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) ->
+notify_queues(#ch{consumer_mapping = Consumers}) ->
rabbit_amqqueue:notify_down_all(
[QPid || QueueName <-
sets:to_list(
@@ -809,7 +801,7 @@ notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) ->
%% queue has been deleted in the meantime
{error, not_found} -> QPid = none, false
end],
- ProxyPid).
+ self()).
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
@@ -827,7 +819,7 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
-internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag,
+internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
{_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
@@ -839,6 +831,6 @@ internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag,
routing_key = RoutingKey},
ok = case Notify of
true -> rabbit_writer:send_command_and_notify(
- WriterPid, QPid, ChPid, M, Content);
+ WriterPid, QPid, self(), M, Content);
false -> rabbit_writer:send_command(WriterPid, M, Content)
end.