diff options
author | Matthias Radestock <matthias@lshift.net> | 2008-09-30 14:14:01 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2008-09-30 14:14:01 +0100 |
commit | 2d0364fd47662b0038d6fb9a7aef905c3465e7bc (patch) | |
tree | 4d83b52d131f0fa63afefd1631a82649b96b32fc | |
parent | 58231b7d62d25f98755800c4bdfa92d239e7a6e3 (diff) | |
download | rabbitmq-server-2d0364fd47662b0038d6fb9a7aef905c3465e7bc.tar.gz |
modulate gen_server:call timeout when doing work in parallel
When we fire off lots of gen_server:calls in parallel, we may create
enough work for the VM to cause the calls to time out - since the
amount of work that can actually be done in parallel is finite.
The fix is to adjust the timeout based on the total workload.
Alternatively we could not have any timeout at all, but
that is bad Erlang style since a small error somewhere could result in
stuck processes.
I moved the parallelisation - and hence timeout modulation - from the
channel into the amqqueue module, changing the API in the process -
commit, rollback and notify_down now all operate on lists of
QPids (and I've renamed the functions to make that clear). The
alternative would have been to add Timeout params to these
three functions, but I reckon the API is cleaner this way,
particularly considering that rollback doesn't actually do a call - it
does a cast and hence doesn't require a timeout - so in the
alternative API we'd either have to expose that fact indirectly by not
having a Timeout param, or have a bogus Timeout param, neither of
which is particularly appealing.
I considered making the functions take sets instead of lists, since
that's what the channel code produces, plus sets have a more efficient
length operation. However, API-wise I reckon lists are nicer, plus it
means I can give a more precise type to dialyzer - sets would be
opaque and non-polymorphic.
-rw-r--r-- | src/rabbit_amqqueue.erl | 61 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 52 |
2 files changed, 63 insertions, 50 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7ce350d8..bcb724ea 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -28,12 +28,12 @@ -export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1, - stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4, - commit/2, rollback/2]). + stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). -export([add_binding/4, delete_binding/4, binding_forcibly_removed/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). --export([notify_sent/2, notify_down/2]). +-export([notify_sent/2]). +-export([commit_all/2, rollback_all/2, notify_down_all/2]). -export([on_node_down/1]). -import(mnesia). @@ -44,6 +44,8 @@ -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). +-define(CALL_TIMEOUT, 5000). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -53,6 +55,9 @@ -type(qfun(A) :: fun ((amqqueue()) -> A)). -type(bind_res() :: {'ok', non_neg_integer()} | {'error', 'queue_not_found' | 'exchange_not_found'}). +-type(ok_or_errors() :: + 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). + -spec(start/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). -spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) -> @@ -81,9 +86,9 @@ -spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). --spec(commit/2 :: (pid(), txn()) -> 'ok'). --spec(rollback/2 :: (pid(), txn()) -> 'ok'). --spec(notify_down/2 :: (amqqueue(), pid()) -> 'ok'). +-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). +-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). +-spec(notify_down_all/2 :: ([amqqueue()], pid()) -> ok_or_errors()). -spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok'). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> @@ -287,14 +292,29 @@ requeue(QPid, MsgIds, ChPid) -> ack(QPid, Txn, MsgIds, ChPid) -> gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}). -commit(QPid, Txn) -> - gen_server:call(QPid, {commit, Txn}). - -rollback(QPid, Txn) -> - gen_server:cast(QPid, {rollback, Txn}). - -notify_down(#amqqueue{ pid = QPid }, ChPid) -> - gen_server:call(QPid, {notify_down, ChPid}). +commit_all(QPids, Txn) -> + Timeout = length(QPids) * ?CALL_TIMEOUT, + safe_pmap_ok( + fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end, + QPids). + +rollback_all(QPids, Txn) -> + safe_pmap_ok( + fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end, + QPids). + +notify_down_all(QPids, ChPid) -> + Timeout = length(QPids) * ?CALL_TIMEOUT, + safe_pmap_ok( + fun (QPid) -> + rabbit_misc:with_exit_handler( + %% we don't care if the queue process has terminated + %% in the meantime + fun () -> ok end, + fun () -> gen_server:call(QPid, {notify_down, ChPid}, + Timeout) end) + end, + QPids). binding_forcibly_removed(BindingSpec, QueueName) -> rabbit_misc:execute_mnesia_transaction( @@ -367,3 +387,16 @@ pseudo_queue(QueueName, Pid) -> arguments = [], binding_specs = [], pid = Pid}. + +safe_pmap_ok(F, L) -> + case lists:filter(fun (R) -> R =/= ok end, + rabbit_misc:upmap( + fun (V) -> + try F(V) + catch Class:Reason -> {Class, Reason} + end + end, L)) of + [] -> ok; + Errors -> {error, Errors} + end. + diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5cc07aed..a9278898 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -707,21 +707,6 @@ ack(ProxyPid, TxnKey, UAQ) -> make_tx_id() -> rabbit_misc:guid(). -safe_pmap_set_ok(F, S) -> - case lists:filter(fun (R) -> R =/= ok end, - rabbit_misc:upmap( - fun (V) -> - try F(V) - catch Class:Reason -> {Class, Reason} - end - end, sets:to_list(S))) of - [] -> ok; - Errors -> {error, Errors} - end. - -notify_participants(F, TxnKey, Participants) -> - safe_pmap_set_ok(fun (QPid) -> F(QPid, TxnKey) end, Participants). - new_tx(State) -> State#ch{transaction_id = make_tx_id(), tx_participants = sets:new(), @@ -729,8 +714,8 @@ new_tx(State) -> internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> - case notify_participants(fun rabbit_amqqueue:commit/2, - TxnKey, Participants) of + case rabbit_amqqueue:commit_all(sets:to_list(Participants), + TxnKey) of ok -> new_tx(State); {error, Errors} -> exit({commit_failed, Errors}) end. @@ -743,8 +728,8 @@ internal_rollback(State = #ch{transaction_id = TxnKey, [self(), queue:len(UAQ), queue:len(UAMQ)]), - case notify_participants(fun rabbit_amqqueue:rollback/2, - TxnKey, Participants) of + case rabbit_amqqueue:rollback_all(sets:to_list(Participants), + TxnKey) of ok -> NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}); {error, Errors} -> exit({rollback_failed, Errors}) @@ -767,23 +752,18 @@ fold_per_queue(F, Acc0, UAQ) -> Acc0, D). notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> - safe_pmap_set_ok( - fun (QueueName) -> - case rabbit_amqqueue:with( - QueueName, - fun (Q) -> - rabbit_amqqueue:notify_down(Q, ProxyPid) - end) of - ok -> - ok; - {error, not_found} -> - %% queue has been deleted in the meantime - ok - end - end, - dict:fold(fun (_ConsumerTag, QueueName, S) -> - sets:add_element(QueueName, S) - end, sets:new(), Consumers)). + rabbit_amqqueue:notify_down_all( + [QPid || QueueName <- + sets:to_list( + dict:fold(fun (_ConsumerTag, QueueName, S) -> + sets:add_element(QueueName, S) + end, sets:new(), Consumers)), + case rabbit_amqqueue:lookup(QueueName) of + {ok, Q} -> QPid = Q#amqqueue.pid, true; + %% queue has been deleted in the meantime + {error, not_found} -> QPid = none, false + end], + ProxyPid). is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> |