first cut of turning rabbit_channel into a gen_server2
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ca2782c7..454701ea 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -33,11 +33,12 @@
-export([start_link/4, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2]).
-%% callbacks
--export([init/2, handle_message/2]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
-record(ch, {state, proxy_pid, reader_pid, writer_pid,
transaction_id, tx_participants, next_tag,
@@ -62,102 +63,96 @@
start_link(ReaderPid, WriterPid, Username, VHost) ->
- buffering_proxy:start_link(?MODULE, [ReaderPid, WriterPid,
- Username, VHost]).
+ {ok, Pid} = gen_server2:start_link(
+ ?MODULE, [ReaderPid, WriterPid, Username, VHost], []),
+ Pid.
do(Pid, Method) ->
do(Pid, Method, none).
do(Pid, Method, Content) ->
- Pid ! {method, Method, Content},
- ok.
+ gen_server2:cast(Pid, {method, Method, Content}).
shutdown(Pid) ->
- Pid ! terminate,
- ok.
+ gen_server2:cast(Pid, terminate).
send_command(Pid, Msg) ->
- Pid ! {command, Msg},
- ok.
+ gen_server2:cast(Pid, {command, Msg}).
deliver(Pid, ConsumerTag, AckRequired, Msg) ->
- Pid ! {deliver, ConsumerTag, AckRequired, Msg},
- ok.
+ gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
conserve_memory(Pid, Conserve) ->
- Pid ! {conserve_memory, Conserve},
- ok.
+ gen_server2:cast(Pid, {conserve_memory, Conserve}).
-init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
+init([ReaderPid, WriterPid, Username, VHost]) ->
process_flag(trap_exit, true),
- %% this is bypassing the proxy so alarms can "jump the queue" and
- %% be handled promptly
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
- #ch{state = starting,
- proxy_pid = ProxyPid,
- reader_pid = ReaderPid,
- writer_pid = WriterPid,
- transaction_id = none,
- tx_participants = sets:new(),
- next_tag = 1,
- uncommitted_ack_q = queue:new(),
- unacked_message_q = queue:new(),
- username = Username,
- virtual_host = VHost,
- most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new()}.
-handle_message({method, Method, Content}, State) ->
+ {ok, #ch{state = starting,
+ proxy_pid = self(),
+ reader_pid = ReaderPid,
+ writer_pid = WriterPid,
+ transaction_id = none,
+ tx_participants = sets:new(),
+ next_tag = 1,
+ uncommitted_ack_q = queue:new(),
+ unacked_message_q = queue:new(),
+ username = Username,
+ virtual_host = VHost,
+ most_recently_declared_queue = <<>>,
+ consumer_mapping = dict:new()}}.
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+handle_cast({method, Method, Content}, State) ->
try handle_method(Method, Content, State) of
{reply, Reply, NewState} ->
ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
- NewState;
+ {noreply, NewState};
{noreply, NewState} ->
- NewState;
+ {noreply, NewState};
stop ->
- exit(normal)
+ %% TODO: this isn't quite right; it results in queues
+ %% being notified twice and rabbit_writer:shutdown being
+ %% called twice.
+ {stop, normal, State}
exit:{amqp, Error, Explanation, none} ->
- terminate({amqp, Error, Explanation,
- rabbit_misc:method_record_type(Method)},
- State);
+ {stop, {amqp, Error, Explanation,
+ rabbit_misc:method_record_type(Method)}, State};
exit:normal ->
- terminate(normal, State);
+ {stop, normal, State};
_:Reason ->
- terminate({Reason, erlang:get_stacktrace()}, State)
+ {stop, {Reason, erlang:get_stacktrace()}, State}
-handle_message(terminate, State) ->
- terminate(normal, State);
+handle_cast(terminate, State) ->
+ {stop, normal, State};
-handle_message({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
+handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
- State;
+ {noreply, State};
-handle_message({deliver, ConsumerTag, AckRequired, Msg},
- State = #ch{proxy_pid = ProxyPid,
- writer_pid = WriterPid,
- next_tag = DeliveryTag}) ->
+handle_cast({deliver, ConsumerTag, AckRequired, Msg},
+ State = #ch{proxy_pid = ProxyPid,
+ writer_pid = WriterPid,
+ next_tag = DeliveryTag}) ->
State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State),
ok = internal_deliver(WriterPid, ProxyPid,
true, ConsumerTag, DeliveryTag, Msg),
- State1#ch{next_tag = DeliveryTag + 1};
+ {noreply, State1#ch{next_tag = DeliveryTag + 1}};
-handle_message({conserve_memory, Conserve}, State) ->
+handle_cast({conserve_memory, Conserve}, State) ->
ok = rabbit_writer:send_command(
State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
- State;
-handle_message({'EXIT', _Pid, Reason}, State) ->
- terminate(Reason, State);
-handle_message(Other, State) ->
- terminate({unexpected_channel_message, Other}, State).
+ {noreply, State}.
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {noreply, Reason, State}.
terminate(Reason, State = #ch{writer_pid = WriterPid}) ->
Res = notify_queues(internal_rollback(State)),
@@ -165,8 +160,12 @@ terminate(Reason, State = #ch{writer_pid = WriterPid}) ->
normal -> ok = Res;
_ -> ok
- rabbit_writer:shutdown(WriterPid),
- exit(Reason).
+ rabbit_writer:shutdown(WriterPid).
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.