summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@lshift.net>2010-04-12 11:37:21 +0100
committerSimon MacMullen <simon@lshift.net>2010-04-12 11:37:21 +0100
commitb77ef427b978d2a1ccb78f619bab0e8176edef78 (patch)
tree9105e7ed69287e871721e5c26f14f7cfdf8d9ebb
parentecd46b56aaf59389b98466dbe9eb6e8a265710d4 (diff)
downloadrabbitmq-server-b77ef427b978d2a1ccb78f619bab0e8176edef78.tar.gz
Port various queue operations over to the new delegate system. Unfortunately this doesn't fix the underlying bug!
-rw-r--r--src/delegate.erl25
-rw-r--r--src/rabbit_amqqueue.erl75
2 files changed, 58 insertions, 42 deletions
diff --git a/src/delegate.erl b/src/delegate.erl
index c72a7e5a..98075428 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -34,11 +34,16 @@
-behaviour(gen_server2).
--export([start_link/1, delegate_cast/2, delegate_call/2, server/1]).
+-export([start_link/1, delegate_cast/2, delegate_call/2,
+ delegate_gs2_call/3, delegate_gs2_pcall/4,
+ delegate_gs2_cast/2, delegate_gs2_pcast/3,
+ server/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+%%----------------------------------------------------------------------------
+
%%----------------------------------------------------------------------------
@@ -46,6 +51,24 @@ start_link(Hash) ->
gen_server2:start_link({local, server(Hash)},
?MODULE, [], []).
+delegate_gs2_call(Pid, Msg, Timeout) ->
+ {Status, Res} =
+ delegate_call(Pid, fun(P) -> gen_server2:call(P, Msg, Timeout) end),
+ Res.
+
+delegate_gs2_pcall(Pid, Pri, Msg, Timeout) ->
+ {Status, Res} =
+ delegate_call(Pid,
+ fun(P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end),
+ Res.
+
+delegate_gs2_cast(Pid, Msg) ->
+ delegate_cast(Pid, fun(P) -> gen_server2:cast(P, Msg) end).
+
+delegate_gs2_pcast(Pid, Pri, Msg) ->
+ delegate_cast(Pid, fun(P) -> gen_server2:pcast(P, Pri, Msg) end).
+
+
delegate_call(Node, Thunk) when is_atom(Node) ->
gen_server2:call({server(), Node}, {thunk, Thunk}, infinity);
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ceec00fd..f502b940 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -234,10 +234,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- gen_server2:pcall(QPid, 9, info, infinity).
+ delegate:delegate_gs2_pcall(QPid, 9, info, infinity).
info(#amqqueue{ pid = QPid }, Items) ->
- case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of
+ case delegate:delegate_gs2_pcall(QPid, 9, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -247,7 +247,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
consumers(#amqqueue{ pid = QPid }) ->
- gen_server2:pcall(QPid, 9, consumers, infinity).
+ delegate:delegate_gs2_pcall(QPid, 9, consumers, infinity).
consumers_all(VHostPath) ->
lists:concat(
@@ -256,15 +256,16 @@ consumers_all(VHostPath) ->
{ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
end)).
-stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity).
+stat(#amqqueue{pid = QPid}) -> delegate:delegate_gs2_call(QPid, stat, infinity).
stat_all() ->
lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
- gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity).
+ delegate:delegate_gs2_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
-purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity).
+purge(#amqqueue{ pid = QPid }) ->
+ delegate:delegate_gs2_call(QPid, purge, infinity).
deliver(QPid, #delivery{immediate = true,
txn = Txn, sender = ChPid, message = Message}) ->
@@ -279,28 +280,26 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
true.
redeliver(QPid, Messages) ->
- gen_server2:cast(QPid, {redeliver, Messages}).
+ delegate:delegate_gs2_cast(QPid, {redeliver, Messages}).
requeue(QPid, MsgIds, ChPid) ->
- gen_server2:cast(QPid, {requeue, MsgIds, ChPid}).
+ delegate:delegate_gs2_cast(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
- gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
+ delegate:delegate_gs2_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
commit_all(QPids, Txn) ->
- safe_pmap_ok(
+ safe_delegate_call_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end,
QPids).
rollback_all(QPids, Txn) ->
- safe_pmap_ok(
- fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end,
- QPids).
+ delegate:delegate_cast(QPids,
+ fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end).
notify_down_all(QPids, ChPid) ->
- safe_pmap_ok(
+ safe_delegate_call_ok(
%% we don't care if the queue process has terminated in the
%% meantime
fun (_) -> ok end,
@@ -308,38 +307,35 @@ notify_down_all(QPids, ChPid) ->
QPids).
limit_all(QPids, ChPid, LimiterPid) ->
- safe_pmap_ok(
- fun (_) -> ok end,
- fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end,
- QPids).
+ delegate:delegate_cast(QPids,
+ fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end).
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
- gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity).
+ delegate:delegate_gs2_call(QPid, {claim_queue, ReaderPid}, infinity).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity).
+ delegate:delegate_gs2_call(QPid, {basic_get, ChPid, NoAck}, infinity).
basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
+ delegate:delegate_gs2_call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
infinity).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
- infinity).
+ ok = delegate:delegate_gs2_call(QPid,
+ {basic_cancel, ChPid, ConsumerTag, OkMsg},
+ infinity).
notify_sent(QPid, ChPid) ->
- gen_server2:pcast(QPid, 7, {notify_sent, ChPid}).
+ delegate:delegate_gs2_pcast(QPid, 7, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
- gen_server2:pcast(QPid, 7, {unblock, ChPid}).
+ delegate:delegate_gs2_pcast(QPid, 7, {unblock, ChPid}).
flush_all(QPids, ChPid) ->
- safe_pmap_ok(
- fun (_) -> ok end,
- fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end,
- QPids).
+ delegate:delegate_cast(QPids,
+ fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end).
internal_delete(QueueName) ->
case
@@ -385,17 +381,14 @@ pseudo_queue(QueueName, Pid) ->
arguments = [],
pid = Pid}.
-safe_pmap_ok(H, F, L) ->
- case [R || R <- rabbit_misc:upmap(
- fun (V) ->
- try
- rabbit_misc:with_exit_handler(
- fun () -> H(V) end,
- fun () -> F(V) end)
- catch Class:Reason -> {Class, Reason}
- end
- end, L),
- R =/= ok] of
+safe_delegate_call_ok(H, F, Pids) ->
+ case [R || R = {error, _, _} <- delegate:delegate_call(
+ Pids,
+ fun (Pid) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> H(Pid) end,
+ fun () -> F(Pid) end)
+ end)] of
[] -> ok;
Errors -> {error, Errors}
end.