diff options
author | Simon MacMullen <simon@lshift.net> | 2010-04-12 11:37:21 +0100 |
---|---|---|
committer | Simon MacMullen <simon@lshift.net> | 2010-04-12 11:37:21 +0100 |
commit | b77ef427b978d2a1ccb78f619bab0e8176edef78 (patch) | |
tree | 9105e7ed69287e871721e5c26f14f7cfdf8d9ebb | |
parent | ecd46b56aaf59389b98466dbe9eb6e8a265710d4 (diff) | |
download | rabbitmq-server-b77ef427b978d2a1ccb78f619bab0e8176edef78.tar.gz |
Port various queue operations over to the new delegate system. Unfortunately this doesn't fix the underlying bug!
-rw-r--r-- | src/delegate.erl | 25 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 75 |
2 files changed, 58 insertions, 42 deletions
diff --git a/src/delegate.erl b/src/delegate.erl index c72a7e5a..98075428 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -34,11 +34,16 @@ -behaviour(gen_server2). --export([start_link/1, delegate_cast/2, delegate_call/2, server/1]). +-export([start_link/1, delegate_cast/2, delegate_call/2, + delegate_gs2_call/3, delegate_gs2_pcall/4, + delegate_gs2_cast/2, delegate_gs2_pcast/3, + server/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%%---------------------------------------------------------------------------- + %%---------------------------------------------------------------------------- @@ -46,6 +51,24 @@ start_link(Hash) -> gen_server2:start_link({local, server(Hash)}, ?MODULE, [], []). +delegate_gs2_call(Pid, Msg, Timeout) -> + {Status, Res} = + delegate_call(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end), + Res. + +delegate_gs2_pcall(Pid, Pri, Msg, Timeout) -> + {Status, Res} = + delegate_call(Pid, + fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end), + Res. + +delegate_gs2_cast(Pid, Msg) -> + delegate_cast(Pid, fun(P) -> gen_server2:cast(P, Msg) end). + +delegate_gs2_pcast(Pid, Pri, Msg) -> + delegate_cast(Pid, fun(P) -> gen_server2:pcast(P, Pri, Msg) end). + + delegate_call(Node, Thunk) when is_atom(Node) -> gen_server2:call({server(), Node}, {thunk, Thunk}, infinity); diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ceec00fd..f502b940 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -234,10 +234,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - gen_server2:pcall(QPid, 9, info, infinity). + delegate:delegate_gs2_pcall(QPid, 9, info, infinity). info(#amqqueue{ pid = QPid }, Items) -> - case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of + case delegate:delegate_gs2_pcall(QPid, 9, {info, Items}, infinity) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -247,7 +247,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). consumers(#amqqueue{ pid = QPid }) -> - gen_server2:pcall(QPid, 9, consumers, infinity). + delegate:delegate_gs2_pcall(QPid, 9, consumers, infinity). consumers_all(VHostPath) -> lists:concat( @@ -256,15 +256,16 @@ consumers_all(VHostPath) -> {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] end)). -stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity). +stat(#amqqueue{pid = QPid}) -> delegate:delegate_gs2_call(QPid, stat, infinity). stat_all() -> lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity). + delegate:delegate_gs2_call(QPid, {delete, IfUnused, IfEmpty}, infinity). -purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity). +purge(#amqqueue{ pid = QPid }) -> + delegate:delegate_gs2_call(QPid, purge, infinity). deliver(QPid, #delivery{immediate = true, txn = Txn, sender = ChPid, message = Message}) -> @@ -279,28 +280,26 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> true. redeliver(QPid, Messages) -> - gen_server2:cast(QPid, {redeliver, Messages}). + delegate:delegate_gs2_cast(QPid, {redeliver, Messages}). requeue(QPid, MsgIds, ChPid) -> - gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). + delegate:delegate_gs2_cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). + delegate:delegate_gs2_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> - safe_pmap_ok( + safe_delegate_call_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end, QPids). rollback_all(QPids, Txn) -> - safe_pmap_ok( - fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end, - QPids). + delegate:delegate_cast(QPids, + fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end). notify_down_all(QPids, ChPid) -> - safe_pmap_ok( + safe_delegate_call_ok( %% we don't care if the queue process has terminated in the %% meantime fun (_) -> ok end, @@ -308,38 +307,35 @@ notify_down_all(QPids, ChPid) -> QPids). limit_all(QPids, ChPid, LimiterPid) -> - safe_pmap_ok( - fun (_) -> ok end, - fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end, - QPids). + delegate:delegate_cast(QPids, + fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end). claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity). + delegate:delegate_gs2_call(QPid, {claim_queue, ReaderPid}, infinity). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity). + delegate:delegate_gs2_call(QPid, {basic_get, ChPid, NoAck}, infinity). basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + delegate:delegate_gs2_call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, infinity). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, - infinity). + ok = delegate:delegate_gs2_call(QPid, + {basic_cancel, ChPid, ConsumerTag, OkMsg}, + infinity). notify_sent(QPid, ChPid) -> - gen_server2:pcast(QPid, 7, {notify_sent, ChPid}). + delegate:delegate_gs2_pcast(QPid, 7, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server2:pcast(QPid, 7, {unblock, ChPid}). + delegate:delegate_gs2_pcast(QPid, 7, {unblock, ChPid}). flush_all(QPids, ChPid) -> - safe_pmap_ok( - fun (_) -> ok end, - fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end, - QPids). + delegate:delegate_cast(QPids, + fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end). internal_delete(QueueName) -> case @@ -385,17 +381,14 @@ pseudo_queue(QueueName, Pid) -> arguments = [], pid = Pid}. -safe_pmap_ok(H, F, L) -> - case [R || R <- rabbit_misc:upmap( - fun (V) -> - try - rabbit_misc:with_exit_handler( - fun () -> H(V) end, - fun () -> F(V) end) - catch Class:Reason -> {Class, Reason} - end - end, L), - R =/= ok] of +safe_delegate_call_ok(H, F, Pids) -> + case [R || R = {error, _, _} <- delegate:delegate_call( + Pids, + fun (Pid) -> + rabbit_misc:with_exit_handler( + fun () -> H(Pid) end, + fun () -> F(Pid) end) + end)] of [] -> ok; Errors -> {error, Errors} end. |