summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-02-01 20:38:02 +0000
committerMatthias Radestock <matthias@lshift.net>2010-02-01 20:38:02 +0000
commit0924c139a42969e1b76aea814f2e6447e368fa45 (patch)
treed159ea32eee32f8cf4a5be4c5b1462fd4cc28914
parent8a47f1821ac5e532fa5c8185107f42e8c8142848 (diff)
downloadrabbitmq-server-0924c139a42969e1b76aea814f2e6447e368fa45.tar.gz
introduce channel registry
-rw-r--r--src/rabbit_channel.erl23
1 files changed, 15 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f8e10097..ab4fc6e3 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -37,6 +37,7 @@
-export([start_link/5, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2]).
+-export([all/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
@@ -62,6 +63,7 @@
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
-spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
+-spec(all/0 :: () -> [pid()]).
-endif.
@@ -91,12 +93,16 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
conserve_memory(Pid, Conserve) ->
gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}).
+all() ->
+ pg_local:get_members(rabbit_channels).
+
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
process_flag(trap_exit, true),
link(WriterPid),
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ ok = pg_local:join(rabbit_channels, self()),
{ok, #ch{state = starting,
channel = Channel,
reader_pid = ReaderPid,
@@ -168,20 +174,16 @@ handle_info(timeout, State) ->
ok = clear_permission_cache(),
{noreply, State, hibernate}.
-terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid,
- state = terminating}) ->
- rabbit_writer:shutdown(WriterPid),
- rabbit_limiter:shutdown(LimiterPid);
+terminate(_Reason, State = #ch{state = terminating}) ->
+ terminate(State);
-terminate(Reason, State = #ch{writer_pid = WriterPid,
- limiter_pid = LimiterPid}) ->
+terminate(Reason, State) ->
Res = rollback_and_notify(State),
case Reason of
normal -> ok = Res;
_ -> ok
end,
- rabbit_writer:shutdown(WriterPid),
- rabbit_limiter:shutdown(LimiterPid).
+ terminate(State).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -951,3 +953,8 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
WriterPid, QPid, self(), M, Content);
false -> rabbit_writer:send_command(WriterPid, M, Content)
end.
+
+terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) ->
+ pg_local:leave(rabbit_channels, self()),
+ rabbit_writer:shutdown(WriterPid),
+ rabbit_limiter:shutdown(LimiterPid).