diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-01-08 22:04:25 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-01-08 22:04:25 +0000 |
commit | aa8401fe1ae1a020e62afcc8c43d1ee3f2142602 (patch) | |
tree | 2f599b1ed37468728d60059bf559c7f6dbe4eb82 | |
parent | 014f350aec053a8ca7dfb489d8227a7cdc64a769 (diff) | |
download | rabbitmq-server-bug19759.tar.gz |
replace all cross-node gen_server:casts with gen_server2:castsbug19759
The latter is order-preserving, whereas the former isn't.
Rather than just replacing the casts, I replaced all uses of
gen_server with gen_server2 in the affected modules. That way we don't
mix modules when talking to a given process.
-rw-r--r-- | src/rabbit_amqqueue.erl | 40 | ||||
-rw-r--r-- | src/rabbit_router.erl | 2 |
2 files changed, 21 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2b9abb29..cf4c324d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -43,7 +43,7 @@ -export([on_node_down/1]). -import(mnesia). --import(gen_server). +-import(gen_server2). -import(lists). -import(queue). @@ -197,10 +197,10 @@ list(VHostPath) -> map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - gen_server:call(QPid, info). + gen_server2:call(QPid, info). info(#amqqueue{ pid = QPid }, Items) -> - case gen_server:call(QPid, {info, Items}) of + case gen_server2:call(QPid, {info, Items}) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -209,45 +209,45 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). -stat(#amqqueue{pid = QPid}) -> gen_server:call(QPid, stat). +stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat). stat_all() -> lists:map(fun stat/1, rabbit_misc:dirty_read_all(amqqueue)). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - gen_server:call(QPid, {delete, IfUnused, IfEmpty}). + gen_server2:call(QPid, {delete, IfUnused, IfEmpty}). -purge(#amqqueue{ pid = QPid }) -> gen_server:call(QPid, purge). +purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge). deliver(_IsMandatory, true, Txn, Message, QPid) -> - gen_server:call(QPid, {deliver_immediately, Txn, Message}); + gen_server2:call(QPid, {deliver_immediately, Txn, Message}); deliver(true, _IsImmediate, Txn, Message, QPid) -> - gen_server:call(QPid, {deliver, Txn, Message}), + gen_server2:call(QPid, {deliver, Txn, Message}), true; deliver(false, _IsImmediate, Txn, Message, QPid) -> - gen_server:cast(QPid, {deliver, Txn, Message}), + gen_server2:cast(QPid, {deliver, Txn, Message}), true. redeliver(QPid, Messages) -> - gen_server:cast(QPid, {redeliver, Messages}). + gen_server2:cast(QPid, {redeliver, Messages}). requeue(QPid, MsgIds, ChPid) -> - gen_server:cast(QPid, {requeue, MsgIds, ChPid}). + gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}). + gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> Timeout = length(QPids) * ?CALL_TIMEOUT, safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end, + fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, Timeout) end, QPids). rollback_all(QPids, Txn) -> safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end, + fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end, QPids). notify_down_all(QPids, ChPid) -> @@ -256,25 +256,25 @@ notify_down_all(QPids, ChPid) -> %% we don't care if the queue process has terminated in the %% meantime fun (_) -> ok end, - fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end, + fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, Timeout) end, QPids). claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> - gen_server:call(QPid, {claim_queue, ReaderPid}). + gen_server2:call(QPid, {claim_queue, ReaderPid}). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - gen_server:call(QPid, {basic_get, ChPid, NoAck}). + gen_server2:call(QPid, {basic_get, ChPid, NoAck}). basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). + ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). notify_sent(QPid, ChPid) -> - gen_server:cast(QPid, {notify_sent, ChPid}). + gen_server2:cast(QPid, {notify_sent, ChPid}). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index ad653a2f..0b36a53c 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -100,7 +100,7 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false, %% than the non-immediate case below. {ok, lists:flatmap( fun ({Node, QPids}) -> - gen_server:cast( + gen_server2:cast( {?SERVER, Node}, {deliver, QPids, Mandatory, Immediate, Txn, Message}), QPids |