diff options
author | Hubert Plociniczak <hubert@lshift.net> | 2008-10-05 16:58:53 +0100 |
---|---|---|
committer | Hubert Plociniczak <hubert@lshift.net> | 2008-10-05 16:58:53 +0100 |
commit | 8c1a76ad5c03b66534444347bd8f3a9a3214a1e5 (patch) | |
tree | e2205f208c38b11479641080801a6e58ba9153f5 | |
parent | 9566f56e6e3ebb0cd34538a54072d441ef155e69 (diff) | |
parent | edbc8f0a784fb374cb431db83bd0811a86a3a2a5 (diff) | |
download | rabbitmq-server-8c1a76ad5c03b66534444347bd8f3a9a3214a1e5.tar.gz |
merge bug19485 into default
-rw-r--r-- | src/rabbit_amqqueue.erl | 61 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 52 |
2 files changed, 63 insertions, 50 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 7ce350d8..bd64f1e4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -28,12 +28,12 @@ -export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1, - stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4, - commit/2, rollback/2]). + stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). -export([add_binding/4, delete_binding/4, binding_forcibly_removed/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). --export([notify_sent/2, notify_down/2]). +-export([notify_sent/2]). +-export([commit_all/2, rollback_all/2, notify_down_all/2]). -export([on_node_down/1]). -import(mnesia). @@ -44,6 +44,8 @@ -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). +-define(CALL_TIMEOUT, 5000). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -53,6 +55,9 @@ -type(qfun(A) :: fun ((amqqueue()) -> A)). -type(bind_res() :: {'ok', non_neg_integer()} | {'error', 'queue_not_found' | 'exchange_not_found'}). +-type(ok_or_errors() :: + 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). + -spec(start/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). -spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) -> @@ -81,9 +86,9 @@ -spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). --spec(commit/2 :: (pid(), txn()) -> 'ok'). --spec(rollback/2 :: (pid(), txn()) -> 'ok'). --spec(notify_down/2 :: (amqqueue(), pid()) -> 'ok'). +-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). +-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). +-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(binding_forcibly_removed/2 :: (binding_spec(), queue_name()) -> 'ok'). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> @@ -287,14 +292,29 @@ requeue(QPid, MsgIds, ChPid) -> ack(QPid, Txn, MsgIds, ChPid) -> gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}). -commit(QPid, Txn) -> - gen_server:call(QPid, {commit, Txn}). - -rollback(QPid, Txn) -> - gen_server:cast(QPid, {rollback, Txn}). - -notify_down(#amqqueue{ pid = QPid }, ChPid) -> - gen_server:call(QPid, {notify_down, ChPid}). +commit_all(QPids, Txn) -> + Timeout = length(QPids) * ?CALL_TIMEOUT, + safe_pmap_ok( + fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end, + QPids). + +rollback_all(QPids, Txn) -> + safe_pmap_ok( + fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end, + QPids). + +notify_down_all(QPids, ChPid) -> + Timeout = length(QPids) * ?CALL_TIMEOUT, + safe_pmap_ok( + fun (QPid) -> + rabbit_misc:with_exit_handler( + %% we don't care if the queue process has terminated + %% in the meantime + fun () -> ok end, + fun () -> gen_server:call(QPid, {notify_down, ChPid}, + Timeout) end) + end, + QPids). binding_forcibly_removed(BindingSpec, QueueName) -> rabbit_misc:execute_mnesia_transaction( @@ -367,3 +387,16 @@ pseudo_queue(QueueName, Pid) -> arguments = [], binding_specs = [], pid = Pid}. + +safe_pmap_ok(F, L) -> + case [R || R <- rabbit_misc:upmap( + fun (V) -> + try F(V) + catch Class:Reason -> {Class, Reason} + end + end, L), + R =/= ok] of + [] -> ok; + Errors -> {error, Errors} + end. + diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5cc07aed..a9278898 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -707,21 +707,6 @@ ack(ProxyPid, TxnKey, UAQ) -> make_tx_id() -> rabbit_misc:guid(). -safe_pmap_set_ok(F, S) -> - case lists:filter(fun (R) -> R =/= ok end, - rabbit_misc:upmap( - fun (V) -> - try F(V) - catch Class:Reason -> {Class, Reason} - end - end, sets:to_list(S))) of - [] -> ok; - Errors -> {error, Errors} - end. - -notify_participants(F, TxnKey, Participants) -> - safe_pmap_set_ok(fun (QPid) -> F(QPid, TxnKey) end, Participants). - new_tx(State) -> State#ch{transaction_id = make_tx_id(), tx_participants = sets:new(), @@ -729,8 +714,8 @@ new_tx(State) -> internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> - case notify_participants(fun rabbit_amqqueue:commit/2, - TxnKey, Participants) of + case rabbit_amqqueue:commit_all(sets:to_list(Participants), + TxnKey) of ok -> new_tx(State); {error, Errors} -> exit({commit_failed, Errors}) end. @@ -743,8 +728,8 @@ internal_rollback(State = #ch{transaction_id = TxnKey, [self(), queue:len(UAQ), queue:len(UAMQ)]), - case notify_participants(fun rabbit_amqqueue:rollback/2, - TxnKey, Participants) of + case rabbit_amqqueue:rollback_all(sets:to_list(Participants), + TxnKey) of ok -> NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}); {error, Errors} -> exit({rollback_failed, Errors}) @@ -767,23 +752,18 @@ fold_per_queue(F, Acc0, UAQ) -> Acc0, D). notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> - safe_pmap_set_ok( - fun (QueueName) -> - case rabbit_amqqueue:with( - QueueName, - fun (Q) -> - rabbit_amqqueue:notify_down(Q, ProxyPid) - end) of - ok -> - ok; - {error, not_found} -> - %% queue has been deleted in the meantime - ok - end - end, - dict:fold(fun (_ConsumerTag, QueueName, S) -> - sets:add_element(QueueName, S) - end, sets:new(), Consumers)). + rabbit_amqqueue:notify_down_all( + [QPid || QueueName <- + sets:to_list( + dict:fold(fun (_ConsumerTag, QueueName, S) -> + sets:add_element(QueueName, S) + end, sets:new(), Consumers)), + case rabbit_amqqueue:lookup(QueueName) of + {ok, Q} -> QPid = Q#amqqueue.pid, true; + %% queue has been deleted in the meantime + {error, not_found} -> QPid = none, false + end], + ProxyPid). is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> |