diff options
author | Simon MacMullen <simon@lshift.net> | 2010-04-28 11:16:50 +0100 |
---|---|---|
committer | Simon MacMullen <simon@lshift.net> | 2010-04-28 11:16:50 +0100 |
commit | 0a60ed986a73c267a7c09a2297dc4329e6fbfa88 (patch) | |
tree | 329677d8adcd429f82b54ccafbac24c710c04634 | |
parent | 5a8df1c06a8222cc61044f039b040afb481731a6 (diff) | |
download | rabbitmq-server-0a60ed986a73c267a7c09a2297dc4329e6fbfa88.tar.gz |
Move the gen_server2-via-delegate code into amqqueue; it should only be used there anyway.
-rw-r--r-- | src/delegate.erl | 26 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 46 |
2 files changed, 32 insertions, 40 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index c72a3afb..71287496 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -35,8 +35,6 @@ -behaviour(gen_server2). -export([start_link/1, invoke_async/2, invoke/2, - gs2_call/3, gs2_pcall/4, - gs2_cast/2, gs2_pcast/3, server/1, process_count/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -52,13 +50,6 @@ -spec(invoke_async/2 :: (pid() | [pid()], fun((pid()) -> any())) -> 'ok'). -spec(invoke/2 :: (pid() | [pid()], fun((pid()) -> A)) -> A). --spec(gs2_call/3 :: - (serverref(), any(), non_neg_integer() | 'infinity') -> any()). --spec(gs2_pcall/4 :: - (serverref(), number(), any(), non_neg_integer() | 'infinity') -> any()). --spec(gs2_cast/2 :: (serverref(), any()) -> 'ok'). --spec(gs2_pcast/3 :: (serverref(), number(), any()) -> 'ok'). - -spec(server/1 :: (node() | non_neg_integer()) -> atom()). -spec(process_count/0 :: () -> non_neg_integer()). @@ -70,23 +61,6 @@ start_link(Hash) -> gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []). -gs2_call(Pid, Msg, Timeout) -> - {_Status, Res} = - invoke(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end), - Res. - -gs2_pcall(Pid, Pri, Msg, Timeout) -> - {_Status, Res} = - invoke(Pid, fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end), - Res. - -gs2_cast(Pid, Msg) -> - invoke_async(Pid, fun(P) -> gen_server2:cast(P, Msg) end). - -gs2_pcast(Pid, Pri, Msg) -> - invoke_async(Pid, fun(P) -> gen_server2:pcast(P, Pri, Msg) end). - - invoke(Pid, FPid) when is_pid(Pid) -> [{Status, Res, _}] = invoke_per_node([{node(Pid), [Pid]}], FPid), {Status, Res}; diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a5fc4181..ec2368fb 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -225,10 +225,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - delegate:gs2_pcall(QPid, 9, info, infinity). + delegate_pcall(QPid, 9, info, infinity). info(#amqqueue{ pid = QPid }, Items) -> - case delegate:gs2_pcall(QPid, 9, {info, Items}, infinity) of + case delegate_pcall(QPid, 9, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -238,7 +238,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). consumers(#amqqueue{ pid = QPid }) -> - delegate:gs2_pcall(QPid, 9, consumers, infinity). + delegate_pcall(QPid, 9, consumers, infinity). consumers_all(VHostPath) -> lists:concat( @@ -247,16 +247,16 @@ consumers_all(VHostPath) -> {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] end)). -stat(#amqqueue{pid = QPid}) -> delegate:gs2_call(QPid, stat, infinity). +stat(#amqqueue{pid = QPid}) -> delegate_call(QPid, stat, infinity). stat_all() -> lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - delegate:gs2_call(QPid, {delete, IfUnused, IfEmpty}, infinity). + delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). purge(#amqqueue{ pid = QPid }) -> - delegate:gs2_call(QPid, purge, infinity). + delegate_call(QPid, purge, infinity). deliver(QPid, #delivery{immediate = true, txn = Txn, sender = ChPid, message = Message}) -> @@ -271,10 +271,10 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> true. requeue(QPid, MsgIds, ChPid) -> - delegate:gs2_cast(QPid, {requeue, MsgIds, ChPid}). + delegate_cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - delegate:gs2_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). + delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn, ChPid) -> safe_delegate_call_ok( @@ -299,26 +299,26 @@ limit_all(QPids, ChPid, LimiterPid) -> fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end). claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - delegate:gs2_call(QPid, {claim_queue, ReaderPid}, infinity). + delegate_call(QPid, {claim_queue, ReaderPid}, infinity). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - delegate:gs2_call(QPid, {basic_get, ChPid, NoAck}, infinity). + delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity). basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - delegate:gs2_call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + delegate_call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, infinity). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = delegate:gs2_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, + ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, infinity). notify_sent(QPid, ChPid) -> - delegate:gs2_pcast(QPid, 7, {notify_sent, ChPid}). + delegate_pcast(QPid, 7, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - delegate:gs2_pcast(QPid, 7, {unblock, ChPid}). + delegate_pcast(QPid, 7, {unblock, ChPid}). flush_all(QPids, ChPid) -> delegate:invoke_async(QPids, @@ -379,3 +379,21 @@ safe_delegate_call_ok(H, F, Pids) -> [] -> ok; Errors -> {error, Errors} end. + +delegate_call(Pid, Msg, Timeout) -> + {_Status, Res} = + delegate:invoke(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end), + Res. + +delegate_pcall(Pid, Pri, Msg, Timeout) -> + {_Status, Res} = + delegate:invoke(Pid, + fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end), + Res. + +delegate_cast(Pid, Msg) -> + delegate:invoke_async(Pid, fun(P) -> gen_server2:cast(P, Msg) end). + +delegate_pcast(Pid, Pri, Msg) -> + delegate:invoke_async(Pid, fun(P) -> gen_server2:pcast(P, Pri, Msg) end). + |