diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-11-23 10:03:15 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-11-23 10:03:15 +0000 |
commit | ffb21cd3e869ffb52dba5a9c091468be6bfb9eb9 (patch) | |
tree | 0449128640900015c532f2771cedfb5edb12de1a | |
parent | a4ad123c9c39921465a26326c561c73ef50e1431 (diff) | |
download | rabbitmq-server-ffb21cd3e869ffb52dba5a9c091468be6bfb9eb9.tar.gz |
And now it can't deadlock. Beautiful.
-rw-r--r-- | src/rabbit_amqqueue.erl | 11 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 8 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 33 |
3 files changed, 32 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 169c7c6d..20742427 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -39,7 +39,7 @@ -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2, unblock/2]). +-export([notify_sent/2, unblock/3]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -100,7 +100,7 @@ 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). --spec(unblock/2 :: (pid(), pid()) -> boolean()). +-spec(unblock/3 :: (('sync' | 'async'), pid(), pid()) -> boolean()). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). @@ -305,8 +305,11 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> notify_sent(QPid, ChPid) -> gen_server2:pcast(QPid, 8, {notify_sent, ChPid}). -unblock(QPid, ChPid) -> - gen_server2:pcall(QPid, 8, {unblock, ChPid}). +unblock(sync, QPid, ChPid) -> + gen_server2:pcall(QPid, 8, {unblock, ChPid}, 100); +unblock(async, QPid, ChPid) -> + ok = gen_server2:pcast(QPid, 8, {unblock, ChPid}), + false. internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ca14c3dd..cb53f4e0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -814,7 +814,13 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end, NewLimited = Limited andalso LimiterPid =/= undefined, C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} - end, fun (_, _) -> ok end)). + end, fun (_, _) -> ok end)); + +handle_cast({unblock, ChPid}, State) -> + noreply( + possibly_unblock(State, ChPid, + fun (C) -> C#cr{is_limit_active = false} end, + fun (_, _) -> ok end)). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 4e95f37d..413658d8 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -135,7 +135,6 @@ handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count end, - io:format("~p OldVolume ~p ; Count ~p ; NewVolume ~p~n", [self(), Volume, Count, NewVolume]), {noreply, maybe_notify(State, State#lim{volume = NewVolume})}; handle_cast({register, QPid}, State) -> @@ -177,7 +176,7 @@ forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) -> case dict:find(QPid, Queues) of {ok, {MRef, _, _}} -> true = erlang:demonitor(MRef), - unblock(QPid, ChPid), + unblock(async, QPid, ChPid), State#lim{queues = dict:erase(QPid, Queues)}; error -> State end. @@ -207,22 +206,22 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues, %% try to tell enough queues that we guarantee we'll get %% blocked again {Capacity1, NewQueues1} = - unblock_queue(ChPid, BiggestLength1, Capacity, QList, + unblock_queue(sync, ChPid, BiggestLength1, Capacity, QList, Queues), case 0 == Capacity1 of true -> NewQueues1; false -> %% just tell everyone {_Capacity2, NewQueues2} = - unblock_queue(ChPid, 1, QCount, QList, NewQueues1), + unblock_queue(async, ChPid, 1, QCount, QList, + NewQueues1), NewQueues2 end end, State#lim{queues = NewQueues}. -unblock_queue(_ChPid, _L, 0, _QList, Queues) -> +unblock_queue(_Mode, _ChPid, _L, 0, _QList, Queues) -> {0, Queues}; -unblock_queue(ChPid, L, QueueCount, QList, Queues) -> - UpdateFunUnBlock = fun ({MRef, _, Length}) -> {MRef, false, Length} end, +unblock_queue(Mode, ChPid, L, QueueCount, QList, Queues) -> {Length, QPid, QList1} = gb_trees:take_largest(QList), {_MRef, Blocked, Length} = dict:fetch(QPid, Queues), {QueueCount1, Queues1} = @@ -232,20 +231,24 @@ unblock_queue(ChPid, L, QueueCount, QList, Queues) -> true -> %% if 0 == Length, and L == 1, we still need to inform the q case Length + 1 >= random:uniform(L) of - true -> case unblock(QPid, ChPid) of - true -> {QueueCount - 1, - dict:update(QPid, UpdateFunUnBlock, Queues)}; - false -> {QueueCount, Queues} - end; + true -> + case unblock(Mode, QPid, ChPid) of + true -> + {QueueCount - 1, + dict:update(QPid, fun unblock_fun/1, Queues)}; + false -> {QueueCount, Queues} + end; false -> {QueueCount, Queues} end end, case gb_trees:is_empty(QList1) of true -> {QueueCount1, Queues1}; - false -> unblock_queue(ChPid, L, QueueCount1, QList1, Queues1) + false -> unblock_queue(Mode, ChPid, L, QueueCount1, QList1, Queues1) end. -unblock(QPid, ChPid) -> +unblock(Mode, QPid, ChPid) -> rabbit_misc:with_exit_handler( fun () -> false end, - fun () -> rabbit_amqqueue:unblock(QPid, ChPid) end). + fun () -> rabbit_amqqueue:unblock(Mode, QPid, ChPid) end). + +unblock_fun({MRef, _, Length}) -> {MRef, false, Length}. |