diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-12-03 13:51:43 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-12-03 13:51:43 +0000 |
commit | 5c23d01c49572a8c1b9a4d732461d3db26beee45 (patch) | |
tree | acd87d22f133078f0383e2e993029ae1cef2681a | |
parent | 2a213f043860e20e836de3d735f4d38debc450a2 (diff) | |
download | rabbitmq-server-5c23d01c49572a8c1b9a4d732461d3db26beee45.tar.gz |
move generic cross-node funs from rabbit_amqqueue to delegate
since the latter changes less frequently
-rw-r--r-- | src/delegate.erl | 12 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 59 |
2 files changed, 32 insertions, 39 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index d595e481..9222c34c 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/1, invoke_no_result/2, invoke/2]). +-export([start_link/1, invoke_no_result/2, invoke/2, call/2, cast/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -35,6 +35,10 @@ [{pid(), term()}]}). -spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). +-spec(call/2 :: + ( pid(), any()) -> any(); + ([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}). +-spec(cast/2 :: (pid() | [pid()], any()) -> 'ok'). -endif. @@ -96,6 +100,12 @@ invoke_no_result(Pids, Fun) when is_list(Pids) -> safe_invoke(LocalPids, Fun), %% must not die ok. +call(PidOrPids, Msg) -> + invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end). + +cast(PidOrPids, Msg) -> + invoke_no_result(PidOrPids, fun (P) -> gen_server2:cast(P, Msg) end). + %%---------------------------------------------------------------------------- group_pids_by_node(Pids) -> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c48aa6dd..3b54c1c3 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -438,10 +438,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). -info(#amqqueue{ pid = QPid }) -> delegate_call(QPid, info). +info(#amqqueue{ pid = QPid }) -> delegate:call(QPid, info). info(#amqqueue{ pid = QPid }, Items) -> - case delegate_call(QPid, {info, Items}) of + case delegate:call(QPid, {info, Items}) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -472,7 +472,7 @@ force_event_refresh(QNames) -> wake_up(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, wake_up). -consumers(#amqqueue{ pid = QPid }) -> delegate_call(QPid, consumers). +consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers). consumer_info_keys() -> ?CONSUMER_INFO_KEYS. @@ -486,27 +486,27 @@ consumers_all(VHostPath) -> {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] end)). -stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat). +stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat). delete_immediately(QPids) -> [gen_server2:cast(QPid, delete_immediately) || QPid <- QPids], ok. delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - delegate_call(QPid, {delete, IfUnused, IfEmpty}). + delegate:call(QPid, {delete, IfUnused, IfEmpty}). -purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge). +purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge). deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow). deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow). -requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}). +requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}). -ack(QPid, MsgIds, ChPid) -> delegate_cast(QPid, {ack, MsgIds, ChPid}). +ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}). reject(QPid, MsgIds, Requeue, ChPid) -> - delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}). + delegate:cast(QPid, {reject, MsgIds, Requeue, ChPid}). notify_down_all(QPids, ChPid) -> safe_delegate_call_ok( @@ -514,19 +514,18 @@ notify_down_all(QPids, ChPid) -> QPids). limit_all(QPids, ChPid, Limiter) -> - delegate:invoke_no_result( - QPids, fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, Limiter}) end). + delegate:cast(QPids, {limit, ChPid, Limiter}). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - delegate_call(QPid, {basic_get, ChPid, NoAck}). + delegate:call(QPid, {basic_get, ChPid, NoAck}). basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter, ConsumerTag, ExclusiveConsume, OkMsg) -> - delegate_call(QPid, {basic_consume, NoAck, ChPid, + delegate:call(QPid, {basic_consume, NoAck, ChPid, Limiter, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). + delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). notify_sent(QPid, ChPid) -> Key = {consumer_credit_to, QPid}, @@ -545,11 +544,9 @@ notify_sent_queue_down(QPid) -> erase({consumer_credit_to, QPid}), ok. -unblock(QPid, ChPid) -> delegate_cast(QPid, {unblock, ChPid}). +unblock(QPid, ChPid) -> delegate:cast(QPid, {unblock, ChPid}). -flush_all(QPids, ChPid) -> - delegate:invoke_no_result( - QPids, fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end). +flush_all(QPids, ChPid) -> delegate:cast(QPids, {flush, ChPid}). internal_delete1(QueueName) -> ok = mnesia:delete({rabbit_queue, QueueName}), @@ -588,8 +585,8 @@ set_ram_duration_target(QPid, Duration) -> set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). -start_mirroring(QPid) -> ok = delegate_cast(QPid, start_mirroring). -stop_mirroring(QPid) -> ok = delegate_cast(QPid, stop_mirroring). +start_mirroring(QPid) -> ok = delegate:cast(QPid, start_mirroring). +stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring). on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( @@ -652,10 +649,8 @@ deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) -> %% done with it. MMsg = {deliver, Delivery, false, Flow}, SMsg = {deliver, Delivery, true, Flow}, - delegate:invoke_no_result(MPids, - fun (QPid) -> gen_server2:cast(QPid, MMsg) end), - delegate:invoke_no_result(SPids, - fun (QPid) -> gen_server2:cast(QPid, SMsg) end), + delegate:cast(MPids, MMsg), + delegate:cast(SPids, SMsg), {routed, QPids}; deliver(Qs, Delivery, _Flow) -> @@ -663,14 +658,8 @@ deliver(Qs, Delivery, _Flow) -> %% see comment above MMsg = {deliver, Delivery, false}, SMsg = {deliver, Delivery, true}, - {MRouted, _} = delegate:invoke( - MPids, fun (QPid) -> - ok = gen_server2:call(QPid, MMsg, infinity) - end), - {SRouted, _} = delegate:invoke( - SPids, fun (QPid) -> - ok = gen_server2:call(QPid, SMsg, infinity) - end), + {MRouted, _} = delegate:call(MPids, MMsg), + {SRouted, _} = delegate:call(SPids, SMsg), case MRouted ++ SRouted of [] -> {unroutable, []}; R -> {routed, [QPid || {QPid, ok} <- R]} @@ -696,9 +685,3 @@ safe_delegate_call_ok(F, Pids) -> [] -> ok; Bads1 -> {error, Bads1} end. - -delegate_call(Pid, Msg) -> - delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, infinity) end). - -delegate_cast(Pid, Msg) -> - delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end). |