summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-08-05 10:33:32 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-08-05 10:33:32 +0100
commit2dd6a8c56708c31b13bcd9829f508029a4f4fc10 (patch)
tree6759791b764c1299ff9dcf89025756e6254e5592
parent72fc5b19b17bf5ba9ef51f66e93544d88136855b (diff)
parent8208718a3b3828df0bec20bb86fb7e4ec654136f (diff)
downloadrabbitmq-server-2dd6a8c56708c31b13bcd9829f508029a4f4fc10.tar.gz
Merge bug23077 into default.
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_reader.erl17
-rw-r--r--src/rabbit_router.erl4
-rw-r--r--src/rabbit_tests.erl2
5 files changed, 15 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d4226331..f85a15d3 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -355,7 +355,7 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
-emit_stats(#amqqueue{pid = QPid}) ->
+emit_stats(#amqqueue{pid = QPid}) ->
delegate_pcast(QPid, 7, emit_stats).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5e1b1f71..d52660c5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -241,7 +241,7 @@ stop_rate_timer(State = #q{rate_timer_ref = just_measured}) ->
stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
State#q{rate_timer_ref = undefined}.
-
+
stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) ->
State;
stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) ->
@@ -887,7 +887,7 @@ handle_cast(maybe_expire, State) ->
{stop, normal, State};
false -> noreply(ensure_expiry_timer(State))
end;
-
+
handle_cast(emit_stats, State) ->
emit_stats(State),
noreply(State).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 532572fd..4b612f2a 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -380,8 +380,14 @@ terminate(Explanation, State = #v1{connection_state = running}) ->
terminate(_Explanation, State) ->
{force, State}.
-close_connection(State = #v1{connection = #connection{
+close_connection(State = #v1{queue_collector = Collector,
+ connection = #connection{
timeout_sec = TimeoutSec}}) ->
+ %% The spec says "Exclusive queues may only be accessed by the
+ %% current connection, and are deleted when that connection
+ %% closes." This does not strictly imply synchrony, but in
+ %% practice it seems to be what people assume.
+ rabbit_queue_collector:delete_all(Collector),
%% We terminate the connection after the specified interval, but
%% no later than ?CLOSING_TIMEOUT seconds.
TimeoutMillisec =
@@ -457,18 +463,13 @@ wait_for_channel_termination(N, TimerRef) ->
end.
maybe_close(State = #v1{connection_state = closing,
- queue_collector = Collector,
connection = #connection{protocol = Protocol},
sock = Sock}) ->
case all_channels() of
[] ->
- %% Spec says "Exclusive queues may only be accessed by the current
- %% connection, and are deleted when that connection closes."
- %% This does not strictly imply synchrony, but in practice it seems
- %% to be what people assume.
- rabbit_queue_collector:delete_all(Collector),
+ NewState = close_connection(State),
ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
- close_connection(State);
+ NewState;
_ -> State
end;
maybe_close(State) ->
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index d50b9f31..ec049a1a 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -69,8 +69,8 @@ deliver(QPids, Delivery = #delivery{mandatory = false,
deliver(QPids, Delivery) ->
{Success, _} =
delegate:invoke(QPids,
- fun (Pid) ->
- rabbit_amqqueue:deliver(Pid, Delivery)
+ fun (Pid) ->
+ rabbit_amqqueue:deliver(Pid, Delivery)
end),
{Routed, Handled} =
lists:foldl(fun fold_deliveries/2, {false, []}, Success),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 8d9e4ae4..97960571 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1912,7 +1912,7 @@ with_fresh_variable_queue(Fun) ->
{len, 0}]),
_ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)),
passed.
-
+
test_variable_queue() ->
[passed = with_fresh_variable_queue(F) ||
F <- [fun test_variable_queue_dynamic_duration_change/1,