summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-12-03 13:51:43 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-12-03 13:51:43 +0000
commit5c23d01c49572a8c1b9a4d732461d3db26beee45 (patch)
treeacd87d22f133078f0383e2e993029ae1cef2681a
parent2a213f043860e20e836de3d735f4d38debc450a2 (diff)
downloadrabbitmq-server-5c23d01c49572a8c1b9a4d732461d3db26beee45.tar.gz
move generic cross-node funs from rabbit_amqqueue to delegate
since the latter changes less frequently
-rw-r--r--src/delegate.erl12
-rw-r--r--src/rabbit_amqqueue.erl59
2 files changed, 32 insertions, 39 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index d595e481..9222c34c 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/1, invoke_no_result/2, invoke/2]).
+-export([start_link/1, invoke_no_result/2, invoke/2, call/2, cast/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -35,6 +35,10 @@
[{pid(), term()}]}).
-spec(invoke_no_result/2 ::
(pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
+-spec(call/2 ::
+ ( pid(), any()) -> any();
+ ([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}).
+-spec(cast/2 :: (pid() | [pid()], any()) -> 'ok').
-endif.
@@ -96,6 +100,12 @@ invoke_no_result(Pids, Fun) when is_list(Pids) ->
safe_invoke(LocalPids, Fun), %% must not die
ok.
+call(PidOrPids, Msg) ->
+ invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end).
+
+cast(PidOrPids, Msg) ->
+ invoke_no_result(PidOrPids, fun (P) -> gen_server2:cast(P, Msg) end).
+
%%----------------------------------------------------------------------------
group_pids_by_node(Pids) ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c48aa6dd..3b54c1c3 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -438,10 +438,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
-info(#amqqueue{ pid = QPid }) -> delegate_call(QPid, info).
+info(#amqqueue{ pid = QPid }) -> delegate:call(QPid, info).
info(#amqqueue{ pid = QPid }, Items) ->
- case delegate_call(QPid, {info, Items}) of
+ case delegate:call(QPid, {info, Items}) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -472,7 +472,7 @@ force_event_refresh(QNames) ->
wake_up(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, wake_up).
-consumers(#amqqueue{ pid = QPid }) -> delegate_call(QPid, consumers).
+consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers).
consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
@@ -486,27 +486,27 @@ consumers_all(VHostPath) ->
{ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
end)).
-stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat).
+stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat).
delete_immediately(QPids) ->
[gen_server2:cast(QPid, delete_immediately) || QPid <- QPids],
ok.
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
- delegate_call(QPid, {delete, IfUnused, IfEmpty}).
+ delegate:call(QPid, {delete, IfUnused, IfEmpty}).
-purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge).
+purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge).
deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow).
-requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}).
+requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}).
-ack(QPid, MsgIds, ChPid) -> delegate_cast(QPid, {ack, MsgIds, ChPid}).
+ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}).
reject(QPid, MsgIds, Requeue, ChPid) ->
- delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}).
+ delegate:cast(QPid, {reject, MsgIds, Requeue, ChPid}).
notify_down_all(QPids, ChPid) ->
safe_delegate_call_ok(
@@ -514,19 +514,18 @@ notify_down_all(QPids, ChPid) ->
QPids).
limit_all(QPids, ChPid, Limiter) ->
- delegate:invoke_no_result(
- QPids, fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, Limiter}) end).
+ delegate:cast(QPids, {limit, ChPid, Limiter}).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- delegate_call(QPid, {basic_get, ChPid, NoAck}).
+ delegate:call(QPid, {basic_get, ChPid, NoAck}).
basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- delegate_call(QPid, {basic_consume, NoAck, ChPid,
+ delegate:call(QPid, {basic_consume, NoAck, ChPid,
Limiter, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
+ delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
notify_sent(QPid, ChPid) ->
Key = {consumer_credit_to, QPid},
@@ -545,11 +544,9 @@ notify_sent_queue_down(QPid) ->
erase({consumer_credit_to, QPid}),
ok.
-unblock(QPid, ChPid) -> delegate_cast(QPid, {unblock, ChPid}).
+unblock(QPid, ChPid) -> delegate:cast(QPid, {unblock, ChPid}).
-flush_all(QPids, ChPid) ->
- delegate:invoke_no_result(
- QPids, fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end).
+flush_all(QPids, ChPid) -> delegate:cast(QPids, {flush, ChPid}).
internal_delete1(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
@@ -588,8 +585,8 @@ set_ram_duration_target(QPid, Duration) ->
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
-start_mirroring(QPid) -> ok = delegate_cast(QPid, start_mirroring).
-stop_mirroring(QPid) -> ok = delegate_cast(QPid, stop_mirroring).
+start_mirroring(QPid) -> ok = delegate:cast(QPid, start_mirroring).
+stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring).
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
@@ -652,10 +649,8 @@ deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) ->
%% done with it.
MMsg = {deliver, Delivery, false, Flow},
SMsg = {deliver, Delivery, true, Flow},
- delegate:invoke_no_result(MPids,
- fun (QPid) -> gen_server2:cast(QPid, MMsg) end),
- delegate:invoke_no_result(SPids,
- fun (QPid) -> gen_server2:cast(QPid, SMsg) end),
+ delegate:cast(MPids, MMsg),
+ delegate:cast(SPids, SMsg),
{routed, QPids};
deliver(Qs, Delivery, _Flow) ->
@@ -663,14 +658,8 @@ deliver(Qs, Delivery, _Flow) ->
%% see comment above
MMsg = {deliver, Delivery, false},
SMsg = {deliver, Delivery, true},
- {MRouted, _} = delegate:invoke(
- MPids, fun (QPid) ->
- ok = gen_server2:call(QPid, MMsg, infinity)
- end),
- {SRouted, _} = delegate:invoke(
- SPids, fun (QPid) ->
- ok = gen_server2:call(QPid, SMsg, infinity)
- end),
+ {MRouted, _} = delegate:call(MPids, MMsg),
+ {SRouted, _} = delegate:call(SPids, SMsg),
case MRouted ++ SRouted of
[] -> {unroutable, []};
R -> {routed, [QPid || {QPid, ok} <- R]}
@@ -696,9 +685,3 @@ safe_delegate_call_ok(F, Pids) ->
[] -> ok;
Bads1 -> {error, Bads1}
end.
-
-delegate_call(Pid, Msg) ->
- delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, infinity) end).
-
-delegate_cast(Pid, Msg) ->
- delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end).