diff options
author | Matthias Radestock <matthias@lshift.net> | 2010-02-01 20:38:02 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2010-02-01 20:38:02 +0000 |
commit | 0924c139a42969e1b76aea814f2e6447e368fa45 (patch) | |
tree | d159ea32eee32f8cf4a5be4c5b1462fd4cc28914 | |
parent | 8a47f1821ac5e532fa5c8185107f42e8c8142848 (diff) | |
download | rabbitmq-server-0924c139a42969e1b76aea814f2e6447e368fa45.tar.gz |
introduce channel registry
-rw-r--r-- | src/rabbit_channel.erl | 23 |
1 files changed, 15 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f8e10097..ab4fc6e3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -37,6 +37,7 @@ -export([start_link/5, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2]). +-export([all/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). @@ -62,6 +63,7 @@ -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). +-spec(all/0 :: () -> [pid()]). -endif. @@ -91,12 +93,16 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> conserve_memory(Pid, Conserve) -> gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}). +all() -> + pg_local:get_members(rabbit_channels). + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + ok = pg_local:join(rabbit_channels, self()), {ok, #ch{state = starting, channel = Channel, reader_pid = ReaderPid, @@ -168,20 +174,16 @@ handle_info(timeout, State) -> ok = clear_permission_cache(), {noreply, State, hibernate}. -terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, - state = terminating}) -> - rabbit_writer:shutdown(WriterPid), - rabbit_limiter:shutdown(LimiterPid); +terminate(_Reason, State = #ch{state = terminating}) -> + terminate(State); -terminate(Reason, State = #ch{writer_pid = WriterPid, - limiter_pid = LimiterPid}) -> +terminate(Reason, State) -> Res = rollback_and_notify(State), case Reason of normal -> ok = Res; _ -> ok end, - rabbit_writer:shutdown(WriterPid), - rabbit_limiter:shutdown(LimiterPid). + terminate(State). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -951,3 +953,8 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, WriterPid, QPid, self(), M, Content); false -> rabbit_writer:send_command(WriterPid, M, Content) end. + +terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> + pg_local:leave(rabbit_channels, self()), + rabbit_writer:shutdown(WriterPid), + rabbit_limiter:shutdown(LimiterPid). |