summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-07-08 17:23:17 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-07-08 17:23:17 +0100
commit8eea1585611cae4a28895315ade4c1667f30e011 (patch)
tree2e5cc475838ae5c46929c78cabdd101269af086d
parent55f93eafff65a126679c264db2bdb3e3b63f124d (diff)
parenta024ef975fac20a84c4b6209954f8ab38ac8257b (diff)
downloadrabbitmq-server-8eea1585611cae4a28895315ade4c1667f30e011.tar.gz
Merging bug 22935 into default
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_invariable_queue.erl3
-rw-r--r--src/rabbit_queue_collector.erl (renamed from src/rabbit_reader_queue_collector.erl)30
-rw-r--r--src/rabbit_reader.erl6
4 files changed, 19 insertions, 22 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d30b0367..9a02e2bd 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -735,7 +735,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
%% the connection shuts down.
ok = case Owner of
none -> ok;
- _ -> rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
+ _ -> rabbit_queue_collector:register(CollectorPid, Q)
end,
return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
{existing, _Q} ->
diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl
index df8adb2e..8214b976 100644
--- a/src/rabbit_invariable_queue.erl
+++ b/src/rabbit_invariable_queue.erl
@@ -242,8 +242,7 @@ do_if_persistent(F, Txn, QName) ->
persist_message(QName, true, Txn, Msg = #basic_message {
is_persistent = true }) ->
Msg1 = Msg #basic_message {
- %% don't persist any recoverable decoded properties,
- %% rebuild from properties_bin on restore
+ %% don't persist any recoverable decoded properties
content = rabbit_binary_parser:clear_decoded_content(
Msg #basic_message.content)},
persist_work(Txn, QName,
diff --git a/src/rabbit_reader_queue_collector.erl b/src/rabbit_queue_collector.erl
index a9117e9c..ea3768d4 100644
--- a/src/rabbit_reader_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -29,16 +29,16 @@
%% Contributor(s): ______________________________________.
%%
--module(rabbit_reader_queue_collector).
+-module(rabbit_queue_collector).
-behaviour(gen_server).
--export([start_link/0, register_exclusive_queue/2, delete_all/1, shutdown/1]).
+-export([start_link/0, register/2, delete_all/1, shutdown/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {exclusive_queues}).
+-record(state, {queues}).
-include("rabbit.hrl").
@@ -47,7 +47,7 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok(pid())).
--spec(register_exclusive_queue/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok').
+-spec(register/2 :: (pid(), rabbit_types:amqqueue()) -> 'ok').
-spec(delete_all/1 :: (pid()) -> 'ok').
-endif.
@@ -57,8 +57,8 @@
start_link() ->
gen_server:start_link(?MODULE, [], []).
-register_exclusive_queue(CollectorPid, Q) ->
- gen_server:call(CollectorPid, {register_exclusive_queue, Q}, infinity).
+register(CollectorPid, Q) ->
+ gen_server:call(CollectorPid, {register, Q}, infinity).
delete_all(CollectorPid) ->
gen_server:call(CollectorPid, delete_all, infinity).
@@ -69,25 +69,24 @@ shutdown(CollectorPid) ->
%%----------------------------------------------------------------------------
init([]) ->
- {ok, #state{exclusive_queues = dict:new()}}.
+ {ok, #state{queues = dict:new()}}.
%%--------------------------------------------------------------------------
-handle_call({register_exclusive_queue, Q}, _From,
- State = #state{exclusive_queues = Queues}) ->
+handle_call({register, Q}, _From,
+ State = #state{queues = Queues}) ->
MonitorRef = erlang:monitor(process, Q#amqqueue.pid),
{reply, ok,
- State#state{exclusive_queues = dict:store(MonitorRef, Q, Queues)}};
+ State#state{queues = dict:store(MonitorRef, Q, Queues)}};
-handle_call(delete_all, _From,
- State = #state{exclusive_queues = ExclusiveQueues}) ->
+handle_call(delete_all, _From, State = #state{queues = Queues}) ->
[rabbit_misc:with_exit_handler(
fun () -> ok end,
fun () ->
erlang:demonitor(MonitorRef),
rabbit_amqqueue:delete(Q, false, false)
end)
- || {MonitorRef, Q} <- dict:to_list(ExclusiveQueues)],
+ || {MonitorRef, Q} <- dict:to_list(Queues)],
{reply, ok, State};
handle_call(shutdown, _From, State) ->
@@ -97,9 +96,8 @@ handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'DOWN', MonitorRef, process, _DownPid, _Reason},
- State = #state{exclusive_queues = ExclusiveQueues}) ->
- {noreply, State#state{exclusive_queues =
- dict:erase(MonitorRef, ExclusiveQueues)}}.
+ State = #state{queues = Queues}) ->
+ {noreply, State#state{queues = dict:erase(MonitorRef, Queues)}}.
terminate(_Reason, _State) ->
ok.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e355cd26..b5514c82 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -240,7 +240,7 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
ProfilingValue = setup_profiling(),
- {ok, Collector} = rabbit_reader_queue_collector:start_link(),
+ {ok, Collector} = rabbit_queue_collector:start_link(),
try
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
@@ -272,7 +272,7 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%%
%% gen_tcp:close(ClientSock),
teardown_profiling(ProfilingValue),
- rabbit_reader_queue_collector:shutdown(Collector),
+ rabbit_queue_collector:shutdown(Collector),
rabbit_misc:unlink_and_capture_exit(Collector)
end,
done.
@@ -444,7 +444,7 @@ maybe_close(State = #v1{connection_state = closing,
%% connection, and are deleted when that connection closes."
%% This does not strictly imply synchrony, but in practice it seems
%% to be what people assume.
- rabbit_reader_queue_collector:delete_all(Collector),
+ rabbit_queue_collector:delete_all(Collector),
ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
close_connection(State);
_ -> State