diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-01-09 12:12:38 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-01-09 12:12:38 +0000 |
commit | 7b62fa15b69499ed5b67c39f51126521d3e20879 (patch) | |
tree | f032b6a0477ad801caf5466ff2c84800cebf1f5f /src | |
parent | 5ee556df3bf9f60aab40fb423fe2cff377ccc394 (diff) | |
download | rabbitmq-server-7b62fa15b69499ed5b67c39f51126521d3e20879.tar.gz |
first cut of turning rabbit_channel into a gen_server2
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_channel.erl | 119 |
1 files changed, 59 insertions, 60 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ca2782c7..454701ea 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -33,11 +33,12 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). +-behaviour(gen_server2). + -export([start_link/4, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2]). -%% callbacks --export([init/2, handle_message/2]). +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -record(ch, {state, proxy_pid, reader_pid, writer_pid, transaction_id, tx_participants, next_tag, @@ -62,102 +63,96 @@ %%---------------------------------------------------------------------------- start_link(ReaderPid, WriterPid, Username, VHost) -> - buffering_proxy:start_link(?MODULE, [ReaderPid, WriterPid, - Username, VHost]). + {ok, Pid} = gen_server2:start_link( + ?MODULE, [ReaderPid, WriterPid, Username, VHost], []), + Pid. do(Pid, Method) -> do(Pid, Method, none). do(Pid, Method, Content) -> - Pid ! {method, Method, Content}, - ok. + gen_server2:cast(Pid, {method, Method, Content}). shutdown(Pid) -> - Pid ! terminate, - ok. + gen_server2:cast(Pid, terminate). send_command(Pid, Msg) -> - Pid ! {command, Msg}, - ok. + gen_server2:cast(Pid, {command, Msg}). deliver(Pid, ConsumerTag, AckRequired, Msg) -> - Pid ! {deliver, ConsumerTag, AckRequired, Msg}, - ok. + gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). conserve_memory(Pid, Conserve) -> - Pid ! {conserve_memory, Conserve}, - ok. + gen_server2:cast(Pid, {conserve_memory, Conserve}). %%--------------------------------------------------------------------------- -init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> +init([ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), - %% this is bypassing the proxy so alarms can "jump the queue" and - %% be handled promptly rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), - #ch{state = starting, - proxy_pid = ProxyPid, - reader_pid = ReaderPid, - writer_pid = WriterPid, - transaction_id = none, - tx_participants = sets:new(), - next_tag = 1, - uncommitted_ack_q = queue:new(), - unacked_message_q = queue:new(), - username = Username, - virtual_host = VHost, - most_recently_declared_queue = <<>>, - consumer_mapping = dict:new()}. - -handle_message({method, Method, Content}, State) -> + {ok, #ch{state = starting, + proxy_pid = self(), + reader_pid = ReaderPid, + writer_pid = WriterPid, + transaction_id = none, + tx_participants = sets:new(), + next_tag = 1, + uncommitted_ack_q = queue:new(), + unacked_message_q = queue:new(), + username = Username, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + consumer_mapping = dict:new()}}. + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast({method, Method, Content}, State) -> try handle_method(Method, Content, State) of {reply, Reply, NewState} -> ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), - NewState; + {noreply, NewState}; {noreply, NewState} -> - NewState; + {noreply, NewState}; stop -> - exit(normal) + %% TODO: this isn't quite right; it results in queues + %% being notified twice and rabbit_writer:shutdown being + %% called twice. + {stop, normal, State} catch exit:{amqp, Error, Explanation, none} -> - terminate({amqp, Error, Explanation, - rabbit_misc:method_record_type(Method)}, - State); + {stop, {amqp, Error, Explanation, + rabbit_misc:method_record_type(Method)}, State}; exit:normal -> - terminate(normal, State); + {stop, normal, State}; _:Reason -> - terminate({Reason, erlang:get_stacktrace()}, State) + {stop, {Reason, erlang:get_stacktrace()}, State} end; -handle_message(terminate, State) -> - terminate(normal, State); +handle_cast(terminate, State) -> + {stop, normal, State}; -handle_message({command, Msg}, State = #ch{writer_pid = WriterPid}) -> +handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), - State; + {noreply, State}; -handle_message({deliver, ConsumerTag, AckRequired, Msg}, - State = #ch{proxy_pid = ProxyPid, - writer_pid = WriterPid, - next_tag = DeliveryTag}) -> +handle_cast({deliver, ConsumerTag, AckRequired, Msg}, + State = #ch{proxy_pid = ProxyPid, + writer_pid = WriterPid, + next_tag = DeliveryTag}) -> State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), ok = internal_deliver(WriterPid, ProxyPid, true, ConsumerTag, DeliveryTag, Msg), - State1#ch{next_tag = DeliveryTag + 1}; + {noreply, State1#ch{next_tag = DeliveryTag + 1}}; -handle_message({conserve_memory, Conserve}, State) -> +handle_cast({conserve_memory, Conserve}, State) -> ok = rabbit_writer:send_command( State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), - State; - -handle_message({'EXIT', _Pid, Reason}, State) -> - terminate(Reason, State); - -handle_message(Other, State) -> - terminate({unexpected_channel_message, Other}, State). + {noreply, State}. -%%--------------------------------------------------------------------------- +handle_info({'EXIT', _Pid, Reason}, State) -> + {noreply, Reason, State}. terminate(Reason, State = #ch{writer_pid = WriterPid}) -> Res = notify_queues(internal_rollback(State)), @@ -165,8 +160,12 @@ terminate(Reason, State = #ch{writer_pid = WriterPid}) -> normal -> ok = Res; _ -> ok end, - rabbit_writer:shutdown(WriterPid), - exit(Reason). + rabbit_writer:shutdown(WriterPid). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%--------------------------------------------------------------------------- return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. |