summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@lshift.net>2010-05-20 15:57:32 +0100
committerSimon MacMullen <simon@lshift.net>2010-05-20 15:57:32 +0100
commita90f4f749272f3f65263e20c36800b704d318af6 (patch)
tree38345dc61ecafb03c9e1e7440649af90763047e7
parent41ce7009fc372ed971f67a6be892690a22ad33a5 (diff)
downloadrabbitmq-server-a90f4f749272f3f65263e20c36800b704d318af6.tar.gz
Move the former bug21824 into a new branch that comes off bug21915 (and thus default) rather than amqp_0_9_1.
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_channel.erl29
-rw-r--r--src/rabbit_reader.erl32
-rw-r--r--src/rabbit_tests.erl2
4 files changed, 47 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d731aa22..92e2e1c0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -819,6 +819,10 @@ handle_cast({set_maximum_since_use, Age}, State) ->
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q= #amqqueue{ exclusive_owner = DownPid}}) ->
%% Exclusively owned queues must disappear with their owner.
+ %% In the case of clean shutdown we delete the queue synchronously in the
+ %% reader - although not required by the spec this seems to match what
+ %% people expect (see bug 21824). However we need this monitor-and-async-
+ %% delete in case the connection goes away unexpectedly.
{stop, normal, State};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
case handle_ch_down(DownPid, State) of
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3926d89d..eeab1fb4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,7 +35,7 @@
-behaviour(gen_server2).
--export([start_link/5, do/2, do/3, shutdown/1]).
+-export([start_link/6, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
@@ -46,7 +46,7 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking}).
+ consumer_mapping, blocking, queue_collector_pid}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -66,8 +66,8 @@
-ifdef(use_specs).
--spec(start_link/5 ::
- (channel_number(), pid(), pid(), username(), vhost()) -> pid()).
+-spec(start_link/6 ::
+ (channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()).
-spec(do/2 :: (pid(), amqp_method()) -> 'ok').
-spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
@@ -86,10 +86,10 @@
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, Username, VHost) ->
+start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) ->
{ok, Pid} = gen_server2:start_link(
?MODULE, [Channel, ReaderPid, WriterPid,
- Username, VHost], []),
+ Username, VHost, CollectorPid], []),
Pid.
do(Pid, Method) ->
@@ -135,7 +135,7 @@ info_all(Items) ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
+init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
process_flag(trap_exit, true),
link(WriterPid),
ok = pg_local:join(rabbit_channels, self()),
@@ -153,7 +153,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
virtual_host = VHost,
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
- blocking = dict:new()},
+ blocking = dict:new(),
+ queue_collector_pid = CollectorPid},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -684,7 +685,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
nowait = NoWait,
arguments = Args},
_, State = #ch { virtual_host = VHostPath,
- reader_pid = ReaderPid }) ->
+ reader_pid = ReaderPid,
+ queue_collector_pid = CollectorPid }) ->
Owner = case ExclusiveDeclare of
true -> ReaderPid;
false -> none
@@ -704,6 +706,15 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
auto_delete = AutoDelete %% i.e,. as supplied
} ->
check_configure_permitted(QueueName, State),
+ %% We need to notify the reader within the channel
+ %% process so that we can be sure there are no
+ %% outstanding exclusive queues being declared as the
+ %% connection shuts down.
+ case Owner of
+ none -> ok;
+ _ -> rabbit_reader_queue_collector:register_exclusive_queue(
+ CollectorPid, Matched)
+ end,
Matched;
%% exclusivity trumps non-equivalence arbitrarily
#amqqueue{name = QueueName, exclusive_owner = ExclusiveOwner}
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index c6bd2973..f573ca6f 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -56,7 +56,8 @@
%---------------------------------------------------------------------------
--record(v1, {sock, connection, callback, recv_ref, connection_state}).
+-record(v1, {sock, connection, callback, recv_ref, connection_state,
+ queue_collector}).
-define(INFO_KEYS,
[pid, address, port, peer_address, peer_port,
@@ -234,6 +235,7 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
ProfilingValue = setup_profiling(),
+ {ok, Collector} = rabbit_reader_queue_collector:start_link(),
try
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
@@ -245,7 +247,8 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
client_properties = none},
callback = uninitialized_callback,
recv_ref = none,
- connection_state = pre_init},
+ connection_state = pre_init,
+ queue_collector = Collector},
handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
@@ -263,7 +266,9 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%% output to be sent, which results in unnecessary delays.
%%
%% gen_tcp:close(ClientSock),
- teardown_profiling(ProfilingValue)
+ teardown_profiling(ProfilingValue),
+ rabbit_reader_queue_collector:shutdown(Collector),
+ rabbit_misc:unlink_and_capture_exit(Collector)
end,
done.
@@ -426,11 +431,17 @@ wait_for_channel_termination(N, TimerRef) ->
exit(channel_termination_timeout)
end.
-maybe_close(State = #v1{connection_state = closing}) ->
+maybe_close(State = #v1{connection_state = closing,
+ queue_collector = Collector}) ->
case all_channels() of
- [] -> ok = send_on_channel0(
- State#v1.sock, #'connection.close_ok'{}),
- close_connection(State);
+ [] ->
+ %% Spec says "Exclusive queues may only be accessed by the current
+ %% connection, and are deleted when that connection closes."
+ %% This does not strictly imply synchrony, but in practice it seems
+ %% to be what people assume.
+ rabbit_reader_queue_collector:delete_all(Collector),
+ ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
+ close_connection(State);
_ -> State
end;
maybe_close(State) ->
@@ -727,15 +738,16 @@ i(Item, #v1{}) ->
%%--------------------------------------------------------------------------
-send_to_new_channel(Channel, AnalyzedFrame, State) ->
+send_to_new_channel(Channel, AnalyzedFrame,
+ State = #v1{queue_collector = Collector}) ->
#v1{sock = Sock, connection = #connection{
frame_max = FrameMax,
user = #user{username = Username},
vhost = VHost}} = State,
WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/5,
- [Channel, self(), WriterPid, Username, VHost]),
+ fun rabbit_channel:start_link/6,
+ [Channel, self(), WriterPid, Username, VHost, Collector]),
put({channel, Channel}, {chpid, ChPid}),
put({chpid, ChPid}, {channel, Channel}),
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 7afa7316..c9f07261 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -748,7 +748,7 @@ test_user_management() ->
test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
- Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>),
+ Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, none),
[Q, Q2] = [#amqqueue{} = rabbit_amqqueue:declare(
rabbit_misc:r(<<"/">>, queue, Name),
false, false, [], none) ||