summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-03-05 10:48:38 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-03-05 10:48:38 +0000
commit867bf496f0f3917bb3109b17464e7a3c5da20ae8 (patch)
tree82ec85486d770ff6a0464ea71a0b40a9772cce25
parentd4fa5254102756b8af4f95822d04285766346f31 (diff)
downloadrabbitmq-server-867bf496f0f3917bb3109b17464e7a3c5da20ae8.tar.gz
shorten maybe_run_queue_via_backing_queue
to something less misleading though arguably still quite obscure Also move make it clear in the amqqueue API which exports are genuine and which are for internal use only.
-rw-r--r--src/rabbit_amqqueue.erl25
-rw-r--r--src/rabbit_amqqueue_process.erl61
2 files changed, 43 insertions, 43 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8e4ca8e3..0adaaa7f 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -17,23 +17,24 @@
-module(rabbit_amqqueue).
-export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]).
--export([internal_declare/2, internal_delete/1,
- maybe_run_queue_via_backing_queue/2,
- maybe_run_queue_via_backing_queue_async/2,
- sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
- set_maximum_since_use/2, maybe_expire/1, drop_expired/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
stat/1, deliver/2, requeue/3, ack/4, reject/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
--export([emit_stats/1]).
-export([consumers/1, consumers_all/1]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, unblock/2, flush_all/2]).
-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
+%% internal
+-export([internal_declare/2, internal_delete/1,
+ run_backing_queue/2, run_backing_queue_async/2,
+ sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
+ set_maximum_since_use/2, maybe_expire/1, drop_expired/1,
+ emit_stats/1]).
+
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
@@ -140,9 +141,9 @@
rabbit_types:connection_exit() |
fun ((boolean()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit())).
--spec(maybe_run_queue_via_backing_queue/2 ::
+-spec(run_backing_queue/2 ::
(pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
--spec(maybe_run_queue_via_backing_queue_async/2 ::
+-spec(run_backing_queue_async/2 ::
(pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok').
-spec(sync_timeout/1 :: (pid()) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
@@ -438,11 +439,11 @@ internal_delete(QueueName) ->
end
end).
-maybe_run_queue_via_backing_queue(QPid, Fun) ->
- gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity).
+run_backing_queue(QPid, Fun) ->
+ gen_server2:call(QPid, {run_backing_queue, Fun}, infinity).
-maybe_run_queue_via_backing_queue_async(QPid, Fun) ->
- gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}).
+run_backing_queue_async(QPid, Fun) ->
+ gen_server2:cast(QPid, {run_backing_queue, Fun}).
sync_timeout(QPid) ->
gen_server2:cast(QPid, sync_timeout).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 460a97ce..55ee2ee3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -163,14 +163,14 @@ bq_init(BQ, QName, IsDurable, Recover) ->
Self = self(),
BQ:init(QName, IsDurable, Recover,
fun (Fun) ->
- rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- Self, Fun)
+ rabbit_amqqueue:run_backing_queue_async(Self, Fun)
end,
fun (Fun) ->
rabbit_misc:with_exit_handler(
fun () -> error end,
- fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- Self, Fun) end)
+ fun () ->
+ rabbit_amqqueue:run_backing_queue(Self, Fun)
+ end)
end).
process_args(State = #q{q = #amqqueue{arguments = Arguments}}) ->
@@ -517,7 +517,7 @@ deliver_or_enqueue(Delivery, State) ->
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
- maybe_run_queue_via_backing_queue(
+ run_backing_queue(
fun (BQS) -> BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) end,
State).
@@ -622,10 +622,9 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
backing_queue_idle_timeout(State = #q{backing_queue = BQ}) ->
- maybe_run_queue_via_backing_queue(fun (BQS) -> BQ:idle_timeout(BQS) end,
- State).
+ run_backing_queue(fun (BQS) -> BQ:idle_timeout(BQS) end, State).
-maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
+run_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
run_message_queue(State#q{backing_queue_state = Fun(BQS)}).
commit_transaction(Txn, From, C = #cr{acktags = ChAckTags},
@@ -756,29 +755,29 @@ emit_consumer_deleted(ChPid, ConsumerTag) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- info -> 9;
- {info, _Items} -> 9;
- consumers -> 9;
- {maybe_run_queue_via_backing_queue, _Fun} -> 6;
- _ -> 0
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ {run_backing_queue, _Fun} -> 6;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
case Msg of
- update_ram_duration -> 8;
- delete_immediately -> 8;
- {set_ram_duration_target, _Duration} -> 8;
- {set_maximum_since_use, _Age} -> 8;
- maybe_expire -> 8;
- drop_expired -> 8;
- emit_stats -> 7;
- {ack, _Txn, _MsgIds, _ChPid} -> 7;
- {reject, _MsgIds, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid} -> 7;
- {unblock, _ChPid} -> 7;
- {maybe_run_queue_via_backing_queue, _Fun} -> 6;
- sync_timeout -> 6;
- _ -> 0
+ update_ram_duration -> 8;
+ delete_immediately -> 8;
+ {set_ram_duration_target, _Duration} -> 8;
+ {set_maximum_since_use, _Age} -> 8;
+ maybe_expire -> 8;
+ drop_expired -> 8;
+ emit_stats -> 7;
+ {ack, _Txn, _MsgIds, _ChPid} -> 7;
+ {reject, _MsgIds, _Requeue, _ChPid} -> 7;
+ {notify_sent, _ChPid} -> 7;
+ {unblock, _ChPid} -> 7;
+ {run_backing_queue, _Fun} -> 6;
+ sync_timeout -> 6;
+ _ -> 0
end.
prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
@@ -991,12 +990,12 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(requeue_and_run(AckTags, State))
end;
-handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
- reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
+handle_call({run_backing_queue, Fun}, _From, State) ->
+ reply(ok, run_backing_queue(Fun, State)).
-handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) ->
- noreply(maybe_run_queue_via_backing_queue(Fun, State));
+handle_cast({run_backing_queue, Fun}, State) ->
+ noreply(run_backing_queue(Fun, State));
handle_cast(sync_timeout, State) ->
noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined}));