summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-23 10:03:15 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-23 10:03:15 +0000
commitffb21cd3e869ffb52dba5a9c091468be6bfb9eb9 (patch)
tree0449128640900015c532f2771cedfb5edb12de1a
parenta4ad123c9c39921465a26326c561c73ef50e1431 (diff)
downloadrabbitmq-server-ffb21cd3e869ffb52dba5a9c091468be6bfb9eb9.tar.gz
And now it can't deadlock. Beautiful.
-rw-r--r--src/rabbit_amqqueue.erl11
-rw-r--r--src/rabbit_amqqueue_process.erl8
-rw-r--r--src/rabbit_limiter.erl33
3 files changed, 32 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 169c7c6d..20742427 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -39,7 +39,7 @@
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
--export([notify_sent/2, unblock/2]).
+-export([notify_sent/2, unblock/3]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -100,7 +100,7 @@
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
--spec(unblock/2 :: (pid(), pid()) -> boolean()).
+-spec(unblock/3 :: (('sync' | 'async'), pid(), pid()) -> boolean()).
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
@@ -305,8 +305,11 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
notify_sent(QPid, ChPid) ->
gen_server2:pcast(QPid, 8, {notify_sent, ChPid}).
-unblock(QPid, ChPid) ->
- gen_server2:pcall(QPid, 8, {unblock, ChPid}).
+unblock(sync, QPid, ChPid) ->
+ gen_server2:pcall(QPid, 8, {unblock, ChPid}, 100);
+unblock(async, QPid, ChPid) ->
+ ok = gen_server2:pcast(QPid, 8, {unblock, ChPid}),
+ false.
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ca14c3dd..cb53f4e0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -814,7 +814,13 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
end,
NewLimited = Limited andalso LimiterPid =/= undefined,
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
- end, fun (_, _) -> ok end)).
+ end, fun (_, _) -> ok end));
+
+handle_cast({unblock, ChPid}, State) ->
+ noreply(
+ possibly_unblock(State, ChPid,
+ fun (C) -> C#cr{is_limit_active = false} end,
+ fun (_, _) -> ok end)).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 4e95f37d..413658d8 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -135,7 +135,6 @@ handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
true -> Volume - Count
end,
- io:format("~p OldVolume ~p ; Count ~p ; NewVolume ~p~n", [self(), Volume, Count, NewVolume]),
{noreply, maybe_notify(State, State#lim{volume = NewVolume})};
handle_cast({register, QPid}, State) ->
@@ -177,7 +176,7 @@ forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) ->
case dict:find(QPid, Queues) of
{ok, {MRef, _, _}} ->
true = erlang:demonitor(MRef),
- unblock(QPid, ChPid),
+ unblock(async, QPid, ChPid),
State#lim{queues = dict:erase(QPid, Queues)};
error -> State
end.
@@ -207,22 +206,22 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues,
%% try to tell enough queues that we guarantee we'll get
%% blocked again
{Capacity1, NewQueues1} =
- unblock_queue(ChPid, BiggestLength1, Capacity, QList,
+ unblock_queue(sync, ChPid, BiggestLength1, Capacity, QList,
Queues),
case 0 == Capacity1 of
true -> NewQueues1;
false -> %% just tell everyone
{_Capacity2, NewQueues2} =
- unblock_queue(ChPid, 1, QCount, QList, NewQueues1),
+ unblock_queue(async, ChPid, 1, QCount, QList,
+ NewQueues1),
NewQueues2
end
end,
State#lim{queues = NewQueues}.
-unblock_queue(_ChPid, _L, 0, _QList, Queues) ->
+unblock_queue(_Mode, _ChPid, _L, 0, _QList, Queues) ->
{0, Queues};
-unblock_queue(ChPid, L, QueueCount, QList, Queues) ->
- UpdateFunUnBlock = fun ({MRef, _, Length}) -> {MRef, false, Length} end,
+unblock_queue(Mode, ChPid, L, QueueCount, QList, Queues) ->
{Length, QPid, QList1} = gb_trees:take_largest(QList),
{_MRef, Blocked, Length} = dict:fetch(QPid, Queues),
{QueueCount1, Queues1} =
@@ -232,20 +231,24 @@ unblock_queue(ChPid, L, QueueCount, QList, Queues) ->
true ->
%% if 0 == Length, and L == 1, we still need to inform the q
case Length + 1 >= random:uniform(L) of
- true -> case unblock(QPid, ChPid) of
- true -> {QueueCount - 1,
- dict:update(QPid, UpdateFunUnBlock, Queues)};
- false -> {QueueCount, Queues}
- end;
+ true ->
+ case unblock(Mode, QPid, ChPid) of
+ true ->
+ {QueueCount - 1,
+ dict:update(QPid, fun unblock_fun/1, Queues)};
+ false -> {QueueCount, Queues}
+ end;
false -> {QueueCount, Queues}
end
end,
case gb_trees:is_empty(QList1) of
true -> {QueueCount1, Queues1};
- false -> unblock_queue(ChPid, L, QueueCount1, QList1, Queues1)
+ false -> unblock_queue(Mode, ChPid, L, QueueCount1, QList1, Queues1)
end.
-unblock(QPid, ChPid) ->
+unblock(Mode, QPid, ChPid) ->
rabbit_misc:with_exit_handler(
fun () -> false end,
- fun () -> rabbit_amqqueue:unblock(QPid, ChPid) end).
+ fun () -> rabbit_amqqueue:unblock(Mode, QPid, ChPid) end).
+
+unblock_fun({MRef, _, Length}) -> {MRef, false, Length}.