summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2008-09-30 14:14:01 +0100
committerMatthias Radestock <matthias@lshift.net>2008-09-30 14:14:01 +0100
commit2d0364fd47662b0038d6fb9a7aef905c3465e7bc (patch)
tree4d83b52d131f0fa63afefd1631a82649b96b32fc
parent58231b7d62d25f98755800c4bdfa92d239e7a6e3 (diff)
downloadrabbitmq-server-2d0364fd47662b0038d6fb9a7aef905c3465e7bc.tar.gz
modulate gen_server:call timeout when doing work in parallel
When we fire off lots of gen_server:calls in parallel, we may create enough work for the VM to cause the calls to time out - since the amount of work that can actually be done in parallel is finite. The fix is to adjust the timeout based on the total workload. Alternatively we could not have any timeout at all, but that is bad Erlang style since a small error somewhere could result in stuck processes. I moved the parallelisation - and hence timeout modulation - from the channel into the amqqueue module, changing the API in the process - commit, rollback and notify_down now all operate on lists of QPids (and I've renamed the functions to make that clear). The alternative would have been to add Timeout params to these three functions, but I reckon the API is cleaner this way, particularly considering that rollback doesn't actually do a call - it does a cast and hence doesn't require a timeout - so in the alternative API we'd either have to expose that fact indirectly by not having a Timeout param, or have a bogus Timeout param, neither of which is particularly appealing. I considered making the functions take sets instead of lists, since that's what the channel code produces, plus sets have a more efficient length operation. However, API-wise I reckon lists are nicer, plus it means I can give a more precise type to dialyzer - sets would be opaque and non-polymorphic.
-rw-r--r--src/rabbit_amqqueue.erl61
-rw-r--r--src/rabbit_channel.erl52
2 files changed, 63 insertions, 50 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 7ce350d8..bcb724ea 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -28,12 +28,12 @@
-export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1,
- stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4,
- commit/2, rollback/2]).
+ stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
-export([add_binding/4, delete_binding/4, binding_forcibly_removed/2]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2, notify_down/2]).
+-export([notify_sent/2]).
+-export([commit_all/2, rollback_all/2, notify_down_all/2]).
-export([on_node_down/1]).
-import(mnesia).
@@ -44,6 +44,8 @@
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
+-define(CALL_TIMEOUT, 5000).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -53,6 +55,9 @@
-type(qfun(A) :: fun ((amqqueue()) -> A)).
-type(bind_res() :: {'ok', non_neg_integer()} |
{'error', 'queue_not_found' | 'exchange_not_found'}).
+-type(ok_or_errors() ::
+ 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
+
-spec(start/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
-spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) ->
@@ -81,9 +86,9 @@
-spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok').
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
--spec(commit/2 :: (pid(), txn()) -> 'ok').
--spec(rollback/2 :: (pid(), txn()) -> 'ok').
--spec(notify_down/2 :: (amqqueue(), pid()) -> 'ok').
+-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
+-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
+-spec(notify_down_all/2 :: ([amqqueue()], pid()) -> ok_or_errors()).
-spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok').
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
@@ -287,14 +292,29 @@ requeue(QPid, MsgIds, ChPid) ->
ack(QPid, Txn, MsgIds, ChPid) ->
gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}).
-commit(QPid, Txn) ->
- gen_server:call(QPid, {commit, Txn}).
-
-rollback(QPid, Txn) ->
- gen_server:cast(QPid, {rollback, Txn}).
-
-notify_down(#amqqueue{ pid = QPid }, ChPid) ->
- gen_server:call(QPid, {notify_down, ChPid}).
+commit_all(QPids, Txn) ->
+ Timeout = length(QPids) * ?CALL_TIMEOUT,
+ safe_pmap_ok(
+ fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end,
+ QPids).
+
+rollback_all(QPids, Txn) ->
+ safe_pmap_ok(
+ fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end,
+ QPids).
+
+notify_down_all(QPids, ChPid) ->
+ Timeout = length(QPids) * ?CALL_TIMEOUT,
+ safe_pmap_ok(
+ fun (QPid) ->
+ rabbit_misc:with_exit_handler(
+ %% we don't care if the queue process has terminated
+ %% in the meantime
+ fun () -> ok end,
+ fun () -> gen_server:call(QPid, {notify_down, ChPid},
+ Timeout) end)
+ end,
+ QPids).
binding_forcibly_removed(BindingSpec, QueueName) ->
rabbit_misc:execute_mnesia_transaction(
@@ -367,3 +387,16 @@ pseudo_queue(QueueName, Pid) ->
arguments = [],
binding_specs = [],
pid = Pid}.
+
+safe_pmap_ok(F, L) ->
+ case lists:filter(fun (R) -> R =/= ok end,
+ rabbit_misc:upmap(
+ fun (V) ->
+ try F(V)
+ catch Class:Reason -> {Class, Reason}
+ end
+ end, L)) of
+ [] -> ok;
+ Errors -> {error, Errors}
+ end.
+
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5cc07aed..a9278898 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -707,21 +707,6 @@ ack(ProxyPid, TxnKey, UAQ) ->
make_tx_id() -> rabbit_misc:guid().
-safe_pmap_set_ok(F, S) ->
- case lists:filter(fun (R) -> R =/= ok end,
- rabbit_misc:upmap(
- fun (V) ->
- try F(V)
- catch Class:Reason -> {Class, Reason}
- end
- end, sets:to_list(S))) of
- [] -> ok;
- Errors -> {error, Errors}
- end.
-
-notify_participants(F, TxnKey, Participants) ->
- safe_pmap_set_ok(fun (QPid) -> F(QPid, TxnKey) end, Participants).
-
new_tx(State) ->
State#ch{transaction_id = make_tx_id(),
tx_participants = sets:new(),
@@ -729,8 +714,8 @@ new_tx(State) ->
internal_commit(State = #ch{transaction_id = TxnKey,
tx_participants = Participants}) ->
- case notify_participants(fun rabbit_amqqueue:commit/2,
- TxnKey, Participants) of
+ case rabbit_amqqueue:commit_all(sets:to_list(Participants),
+ TxnKey) of
ok -> new_tx(State);
{error, Errors} -> exit({commit_failed, Errors})
end.
@@ -743,8 +728,8 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
[self(),
queue:len(UAQ),
queue:len(UAMQ)]),
- case notify_participants(fun rabbit_amqqueue:rollback/2,
- TxnKey, Participants) of
+ case rabbit_amqqueue:rollback_all(sets:to_list(Participants),
+ TxnKey) of
ok -> NewUAMQ = queue:join(UAQ, UAMQ),
new_tx(State#ch{unacked_message_q = NewUAMQ});
{error, Errors} -> exit({rollback_failed, Errors})
@@ -767,23 +752,18 @@ fold_per_queue(F, Acc0, UAQ) ->
Acc0, D).
notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) ->
- safe_pmap_set_ok(
- fun (QueueName) ->
- case rabbit_amqqueue:with(
- QueueName,
- fun (Q) ->
- rabbit_amqqueue:notify_down(Q, ProxyPid)
- end) of
- ok ->
- ok;
- {error, not_found} ->
- %% queue has been deleted in the meantime
- ok
- end
- end,
- dict:fold(fun (_ConsumerTag, QueueName, S) ->
- sets:add_element(QueueName, S)
- end, sets:new(), Consumers)).
+ rabbit_amqqueue:notify_down_all(
+ [QPid || QueueName <-
+ sets:to_list(
+ dict:fold(fun (_ConsumerTag, QueueName, S) ->
+ sets:add_element(QueueName, S)
+ end, sets:new(), Consumers)),
+ case rabbit_amqqueue:lookup(QueueName) of
+ {ok, Q} -> QPid = Q#amqqueue.pid, true;
+ %% queue has been deleted in the meantime
+ {error, not_found} -> QPid = none, false
+ end],
+ ProxyPid).
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->