From cff5049421812ce815081d2dc30a99ea24324bcb Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 7 Jul 2010 15:36:23 +0100 Subject: Rename rabbit_reader_queue_collector to rabbit_queue_collector, and shorten some function and variable names. --- src/rabbit_channel.erl | 2 +- src/rabbit_queue_collector.erl | 106 +++++++++++++++++++++++++++++++++ src/rabbit_reader.erl | 6 +- src/rabbit_reader_queue_collector.erl | 108 ---------------------------------- 4 files changed, 110 insertions(+), 112 deletions(-) create mode 100644 src/rabbit_queue_collector.erl delete mode 100644 src/rabbit_reader_queue_collector.erl diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index fbbc50b9..da8225de 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -735,7 +735,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, %% the connection shuts down. ok = case Owner of none -> ok; - _ -> rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) + _ -> rabbit_queue_collector:register(CollectorPid, Q) end, return_queue_declare_ok(QueueName, NoWait, 0, 0, State); {existing, _Q} -> diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl new file mode 100644 index 00000000..ea3768d4 --- /dev/null +++ b/src/rabbit_queue_collector.erl @@ -0,0 +1,106 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_queue_collector). + +-behaviour(gen_server). + +-export([start_link/0, register/2, delete_all/1, shutdown/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {queues}). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/0 :: () -> rabbit_types:ok(pid())). +-spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok'). +-spec(delete_all/1 :: (pid()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link(?MODULE, [], []). + +register(CollectorPid, Q) -> + gen_server:call(CollectorPid, {register, Q}, infinity). + +delete_all(CollectorPid) -> + gen_server:call(CollectorPid, delete_all, infinity). + +shutdown(CollectorPid) -> + gen_server:call(CollectorPid, shutdown, infinity). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, #state{queues = dict:new()}}. + +%%-------------------------------------------------------------------------- + +handle_call({register, Q}, _From, + State = #state{queues = Queues}) -> + MonitorRef = erlang:monitor(process, Q#amqqueue.pid), + {reply, ok, + State#state{queues = dict:store(MonitorRef, Q, Queues)}}; + +handle_call(delete_all, _From, State = #state{queues = Queues}) -> + [rabbit_misc:with_exit_handler( + fun () -> ok end, + fun () -> + erlang:demonitor(MonitorRef), + rabbit_amqqueue:delete(Q, false, false) + end) + || {MonitorRef, Q} <- dict:to_list(Queues)], + {reply, ok, State}; + +handle_call(shutdown, _From, State) -> + {stop, normal, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, + State = #state{queues = Queues}) -> + {noreply, State#state{queues = dict:erase(MonitorRef, Queues)}}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e355cd26..b5514c82 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -240,7 +240,7 @@ start_connection(Parent, Deb, Sock, SockTransform) -> erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), ProfilingValue = setup_profiling(), - {ok, Collector} = rabbit_reader_queue_collector:start_link(), + {ok, Collector} = rabbit_queue_collector:start_link(), try mainloop(Parent, Deb, switch_callback( #v1{sock = ClientSock, @@ -272,7 +272,7 @@ start_connection(Parent, Deb, Sock, SockTransform) -> %% %% gen_tcp:close(ClientSock), teardown_profiling(ProfilingValue), - rabbit_reader_queue_collector:shutdown(Collector), + rabbit_queue_collector:shutdown(Collector), rabbit_misc:unlink_and_capture_exit(Collector) end, done. @@ -444,7 +444,7 @@ maybe_close(State = #v1{connection_state = closing, %% connection, and are deleted when that connection closes." %% This does not strictly imply synchrony, but in practice it seems %% to be what people assume. - rabbit_reader_queue_collector:delete_all(Collector), + rabbit_queue_collector:delete_all(Collector), ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}), close_connection(State); _ -> State diff --git a/src/rabbit_reader_queue_collector.erl b/src/rabbit_reader_queue_collector.erl deleted file mode 100644 index a9117e9c..00000000 --- a/src/rabbit_reader_queue_collector.erl +++ /dev/null @@ -1,108 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_reader_queue_collector). - --behaviour(gen_server). - --export([start_link/0, register_exclusive_queue/2, delete_all/1, shutdown/1]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --record(state, {exclusive_queues}). - --include("rabbit.hrl"). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --spec(start_link/0 :: () -> rabbit_types:ok(pid())). --spec(register_exclusive_queue/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok'). --spec(delete_all/1 :: (pid()) -> 'ok'). - --endif. - -%%---------------------------------------------------------------------------- - -start_link() -> - gen_server:start_link(?MODULE, [], []). - -register_exclusive_queue(CollectorPid, Q) -> - gen_server:call(CollectorPid, {register_exclusive_queue, Q}, infinity). - -delete_all(CollectorPid) -> - gen_server:call(CollectorPid, delete_all, infinity). - -shutdown(CollectorPid) -> - gen_server:call(CollectorPid, shutdown, infinity). - -%%---------------------------------------------------------------------------- - -init([]) -> - {ok, #state{exclusive_queues = dict:new()}}. - -%%-------------------------------------------------------------------------- - -handle_call({register_exclusive_queue, Q}, _From, - State = #state{exclusive_queues = Queues}) -> - MonitorRef = erlang:monitor(process, Q#amqqueue.pid), - {reply, ok, - State#state{exclusive_queues = dict:store(MonitorRef, Q, Queues)}}; - -handle_call(delete_all, _From, - State = #state{exclusive_queues = ExclusiveQueues}) -> - [rabbit_misc:with_exit_handler( - fun () -> ok end, - fun () -> - erlang:demonitor(MonitorRef), - rabbit_amqqueue:delete(Q, false, false) - end) - || {MonitorRef, Q} <- dict:to_list(ExclusiveQueues)], - {reply, ok, State}; - -handle_call(shutdown, _From, State) -> - {stop, normal, ok, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason}, - State = #state{exclusive_queues = ExclusiveQueues}) -> - {noreply, State#state{exclusive_queues = - dict:erase(MonitorRef, ExclusiveQueues)}}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. -- cgit v1.2.1