summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@lshift.net>2010-04-28 11:16:50 +0100
committerSimon MacMullen <simon@lshift.net>2010-04-28 11:16:50 +0100
commit0a60ed986a73c267a7c09a2297dc4329e6fbfa88 (patch)
tree329677d8adcd429f82b54ccafbac24c710c04634
parent5a8df1c06a8222cc61044f039b040afb481731a6 (diff)
downloadrabbitmq-server-0a60ed986a73c267a7c09a2297dc4329e6fbfa88.tar.gz
Move the gen_server2-via-delegate code into amqqueue; it should only be used there anyway.
-rw-r--r--src/delegate.erl26
-rw-r--r--src/rabbit_amqqueue.erl46
2 files changed, 32 insertions, 40 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index c72a3afb..71287496 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -35,8 +35,6 @@
-behaviour(gen_server2).
-export([start_link/1, invoke_async/2, invoke/2,
- gs2_call/3, gs2_pcall/4,
- gs2_cast/2, gs2_pcast/3,
server/1, process_count/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -52,13 +50,6 @@
-spec(invoke_async/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok').
-spec(invoke/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A).
--spec(gs2_call/3 ::
- (serverref(), any(), non_neg_integer() | 'infinity') -> any()).
--spec(gs2_pcall/4 ::
- (serverref(), number(), any(), non_neg_integer() | 'infinity') -> any()).
--spec(gs2_cast/2 :: (serverref(), any()) -> 'ok').
--spec(gs2_pcast/3 :: (serverref(), number(), any()) -> 'ok').
-
-spec(server/1 :: (node() | non_neg_integer()) -> atom()).
-spec(process_count/0 :: () -> non_neg_integer()).
@@ -70,23 +61,6 @@ start_link(Hash) ->
gen_server2:start_link({local, server(Hash)},
?MODULE, [], []).
-gs2_call(Pid, Msg, Timeout) ->
- {_Status, Res} =
- invoke(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end),
- Res.
-
-gs2_pcall(Pid, Pri, Msg, Timeout) ->
- {_Status, Res} =
- invoke(Pid, fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end),
- Res.
-
-gs2_cast(Pid, Msg) ->
- invoke_async(Pid, fun(P) -> gen_server2:cast(P, Msg) end).
-
-gs2_pcast(Pid, Pri, Msg) ->
- invoke_async(Pid, fun(P) -> gen_server2:pcast(P, Pri, Msg) end).
-
-
invoke(Pid, FPid) when is_pid(Pid) ->
[{Status, Res, _}] = invoke_per_node([{node(Pid), [Pid]}], FPid),
{Status, Res};
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a5fc4181..ec2368fb 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -225,10 +225,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- delegate:gs2_pcall(QPid, 9, info, infinity).
+ delegate_pcall(QPid, 9, info, infinity).
info(#amqqueue{ pid = QPid }, Items) ->
- case delegate:gs2_pcall(QPid, 9, {info, Items}, infinity) of
+ case delegate_pcall(QPid, 9, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -238,7 +238,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
consumers(#amqqueue{ pid = QPid }) ->
- delegate:gs2_pcall(QPid, 9, consumers, infinity).
+ delegate_pcall(QPid, 9, consumers, infinity).
consumers_all(VHostPath) ->
lists:concat(
@@ -247,16 +247,16 @@ consumers_all(VHostPath) ->
{ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
end)).
-stat(#amqqueue{pid = QPid}) -> delegate:gs2_call(QPid, stat, infinity).
+stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity).
stat_all() ->
lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
- delegate:gs2_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
+ delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
purge(#amqqueue{ pid = QPid }) ->
- delegate:gs2_call(QPid, purge, infinity).
+ delegate_call(QPid, purge, infinity).
deliver(QPid, #delivery{immediate = true,
txn = Txn, sender = ChPid, message = Message}) ->
@@ -271,10 +271,10 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
true.
requeue(QPid, MsgIds, ChPid) ->
- delegate:gs2_cast(QPid, {requeue, MsgIds, ChPid}).
+ delegate_cast(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
- delegate:gs2_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
+ delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
commit_all(QPids, Txn, ChPid) ->
safe_delegate_call_ok(
@@ -299,26 +299,26 @@ limit_all(QPids, ChPid, LimiterPid) ->
fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end).
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
- delegate:gs2_call(QPid, {claim_queue, ReaderPid}, infinity).
+ delegate_call(QPid, {claim_queue, ReaderPid}, infinity).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- delegate:gs2_call(QPid, {basic_get, ChPid, NoAck}, infinity).
+ delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity).
basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- delegate:gs2_call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
+ delegate_call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
infinity).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = delegate:gs2_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
+ ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
infinity).
notify_sent(QPid, ChPid) ->
- delegate:gs2_pcast(QPid, 7, {notify_sent, ChPid}).
+ delegate_pcast(QPid, 7, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
- delegate:gs2_pcast(QPid, 7, {unblock, ChPid}).
+ delegate_pcast(QPid, 7, {unblock, ChPid}).
flush_all(QPids, ChPid) ->
delegate:invoke_async(QPids,
@@ -379,3 +379,21 @@ safe_delegate_call_ok(H, F, Pids) ->
[] -> ok;
Errors -> {error, Errors}
end.
+
+delegate_call(Pid, Msg, Timeout) ->
+ {_Status, Res} =
+ delegate:invoke(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end),
+ Res.
+
+delegate_pcall(Pid, Pri, Msg, Timeout) ->
+ {_Status, Res} =
+ delegate:invoke(Pid,
+ fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end),
+ Res.
+
+delegate_cast(Pid, Msg) ->
+ delegate:invoke_async(Pid, fun(P) -> gen_server2:cast(P, Msg) end).
+
+delegate_pcast(Pid, Pri, Msg) ->
+ delegate:invoke_async(Pid, fun(P) -> gen_server2:pcast(P, Pri, Msg) end).
+