summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue.erl
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-02-23 13:50:38 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-02-23 13:50:38 +0000
commit6bfc31d5ad164c82fd83f39ec019aa94f96525b8 (patch)
tree827493c951daf439e9cea382fa7c85ff4e01cf22 /src/rabbit_amqqueue.erl
parent36504e1c72c0d85e4c34d0be9050ad7b9cc79202 (diff)
parent1ee22ba19d1cdfab15811b75d6a4b7a3020eb38d (diff)
downloadrabbitmq-server-6bfc31d5ad164c82fd83f39ec019aa94f96525b8.tar.gz
Merging bug23727 to bug23554
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r--src/rabbit_amqqueue.erl52
1 files changed, 20 insertions, 32 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4ef9750c..155e1317 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -199,7 +199,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
exclusive_owner = Owner,
pid = none,
mirror_pids = []}),
- case gen_server2:call(Q#amqqueue.pid, {init, false}) of
+ case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of
not_found -> rabbit_misc:not_found(QueueName);
Q1 -> Q1
end.
@@ -302,29 +302,19 @@ check_declare_arguments(QueueName, Args) ->
"invalid arg '~s' for ~s: ~w",
[Key, rabbit_misc:rs(QueueName), Error])
end || {Key, Fun} <-
- [{<<"x-expires">>, fun check_expires_argument/1},
- {<<"x-message-ttl">>, fun check_message_ttl_argument/1}]],
+ [{<<"x-expires">>, fun check_integer_argument/1},
+ {<<"x-message-ttl">>, fun check_integer_argument/1}]],
ok.
-check_expires_argument(Val) ->
- check_integer_argument(Val,
- expires_not_of_acceptable_type,
- expires_zero_or_less).
-
-check_message_ttl_argument(Val) ->
- check_integer_argument(Val,
- ttl_not_of_acceptable_type,
- ttl_zero_or_less).
-
-check_integer_argument(undefined, _, _) ->
+check_integer_argument(undefined) ->
ok;
-check_integer_argument({Type, Val}, InvalidTypeError, _) when Val > 0 ->
+check_integer_argument({Type, Val}) when Val > 0 ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
true -> ok;
- false -> {error, {InvalidTypeError, Type, Val}}
+ false -> {error, {unacceptable_type, Type}}
end;
-check_integer_argument({_Type, _Val}, _, ZeroOrLessError) ->
- {error, ZeroOrLessError}.
+check_integer_argument({_Type, Val}) ->
+ {error, {value_zero_or_less, Val}}.
list(VHostPath) ->
mnesia:dirty_match_object(
@@ -336,10 +326,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- delegate_call(QPid, info, infinity).
+ delegate_call(QPid, info).
info(#amqqueue{ pid = QPid }, Items) ->
- case delegate_call(QPid, {info, Items}, infinity) of
+ case delegate_call(QPid, {info, Items}) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -349,7 +339,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
consumers(#amqqueue{ pid = QPid }) ->
- delegate_call(QPid, consumers, infinity).
+ delegate_call(QPid, consumers).
consumers_all(VHostPath) ->
lists:append(
@@ -359,7 +349,7 @@ consumers_all(VHostPath) ->
end)).
stat(#amqqueue{pid = QPid}) ->
- delegate_call(QPid, stat, infinity).
+ delegate_call(QPid, stat).
emit_stats(#amqqueue{pid = QPid}) ->
delegate_cast(QPid, emit_stats).
@@ -368,9 +358,9 @@ delete_immediately(#amqqueue{ pid = QPid }) ->
gen_server2:cast(QPid, delete_immediately).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
- delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity).
+ delegate_call(QPid, {delete, IfUnused, IfEmpty}).
-purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity).
+purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge).
deliver(QPid, Delivery = #delivery{immediate = true}) ->
gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity);
@@ -382,7 +372,7 @@ deliver(QPid, Delivery) ->
true.
requeue(QPid, MsgIds, ChPid) ->
- delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity).
+ delegate_call(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}).
@@ -411,17 +401,15 @@ limit_all(QPids, ChPid, LimiterPid) ->
end).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity).
+ delegate_call(QPid, {basic_get, ChPid, NoAck}).
basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
delegate_call(QPid, {basic_consume, NoAck, ChPid,
- LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
- infinity).
+ LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
- infinity).
+ ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
notify_sent(QPid, ChPid) ->
gen_server2:cast(QPid, {notify_sent, ChPid}).
@@ -516,8 +504,8 @@ safe_delegate_call_ok(F, Pids) ->
{_, Bad} -> {error, Bad}
end.
-delegate_call(Pid, Msg, Timeout) ->
- delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end).
+delegate_call(Pid, Msg) ->
+ delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, infinity) end).
delegate_cast(Pid, Msg) ->
delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end).