diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-03-05 10:48:38 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-03-05 10:48:38 +0000 |
commit | 867bf496f0f3917bb3109b17464e7a3c5da20ae8 (patch) | |
tree | 82ec85486d770ff6a0464ea71a0b40a9772cce25 | |
parent | d4fa5254102756b8af4f95822d04285766346f31 (diff) | |
download | rabbitmq-server-867bf496f0f3917bb3109b17464e7a3c5da20ae8.tar.gz |
shorten maybe_run_queue_via_backing_queue
to something less misleading though arguably still quite obscure
Also move make it clear in the amqqueue API which exports are genuine
and which are for internal use only.
-rw-r--r-- | src/rabbit_amqqueue.erl | 25 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 61 |
2 files changed, 43 insertions, 43 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8e4ca8e3..0adaaa7f 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -17,23 +17,24 @@ -module(rabbit_amqqueue). -export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]). --export([internal_declare/2, internal_delete/1, - maybe_run_queue_via_backing_queue/2, - maybe_run_queue_via_backing_queue_async/2, - sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2, - set_maximum_since_use/2, maybe_expire/1, drop_expired/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, requeue/3, ack/4, reject/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). --export([emit_stats/1]). -export([consumers/1, consumers_all/1]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). -export([notify_sent/2, unblock/2, flush_all/2]). -export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). +%% internal +-export([internal_declare/2, internal_delete/1, + run_backing_queue/2, run_backing_queue_async/2, + sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2, + set_maximum_since_use/2, maybe_expire/1, drop_expired/1, + emit_stats/1]). + -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -140,9 +141,9 @@ rabbit_types:connection_exit() | fun ((boolean()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit())). --spec(maybe_run_queue_via_backing_queue/2 :: +-spec(run_backing_queue/2 :: (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). --spec(maybe_run_queue_via_backing_queue_async/2 :: +-spec(run_backing_queue_async/2 :: (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). -spec(sync_timeout/1 :: (pid()) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). @@ -438,11 +439,11 @@ internal_delete(QueueName) -> end end). -maybe_run_queue_via_backing_queue(QPid, Fun) -> - gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity). +run_backing_queue(QPid, Fun) -> + gen_server2:call(QPid, {run_backing_queue, Fun}, infinity). -maybe_run_queue_via_backing_queue_async(QPid, Fun) -> - gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}). +run_backing_queue_async(QPid, Fun) -> + gen_server2:cast(QPid, {run_backing_queue, Fun}). sync_timeout(QPid) -> gen_server2:cast(QPid, sync_timeout). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 460a97ce..55ee2ee3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -163,14 +163,14 @@ bq_init(BQ, QName, IsDurable, Recover) -> Self = self(), BQ:init(QName, IsDurable, Recover, fun (Fun) -> - rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - Self, Fun) + rabbit_amqqueue:run_backing_queue_async(Self, Fun) end, fun (Fun) -> rabbit_misc:with_exit_handler( fun () -> error end, - fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( - Self, Fun) end) + fun () -> + rabbit_amqqueue:run_backing_queue(Self, Fun) + end) end). process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> @@ -517,7 +517,7 @@ deliver_or_enqueue(Delivery, State) -> end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> - maybe_run_queue_via_backing_queue( + run_backing_queue( fun (BQS) -> BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) end, State). @@ -622,10 +622,9 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. backing_queue_idle_timeout(State = #q{backing_queue = BQ}) -> - maybe_run_queue_via_backing_queue(fun (BQS) -> BQ:idle_timeout(BQS) end, - State). + run_backing_queue(fun (BQS) -> BQ:idle_timeout(BQS) end, State). -maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> +run_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> run_message_queue(State#q{backing_queue_state = Fun(BQS)}). commit_transaction(Txn, From, C = #cr{acktags = ChAckTags}, @@ -756,29 +755,29 @@ emit_consumer_deleted(ChPid, ConsumerTag) -> prioritise_call(Msg, _From, _State) -> case Msg of - info -> 9; - {info, _Items} -> 9; - consumers -> 9; - {maybe_run_queue_via_backing_queue, _Fun} -> 6; - _ -> 0 + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + {run_backing_queue, _Fun} -> 6; + _ -> 0 end. prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; - delete_immediately -> 8; - {set_ram_duration_target, _Duration} -> 8; - {set_maximum_since_use, _Age} -> 8; - maybe_expire -> 8; - drop_expired -> 8; - emit_stats -> 7; - {ack, _Txn, _MsgIds, _ChPid} -> 7; - {reject, _MsgIds, _Requeue, _ChPid} -> 7; - {notify_sent, _ChPid} -> 7; - {unblock, _ChPid} -> 7; - {maybe_run_queue_via_backing_queue, _Fun} -> 6; - sync_timeout -> 6; - _ -> 0 + update_ram_duration -> 8; + delete_immediately -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + maybe_expire -> 8; + drop_expired -> 8; + emit_stats -> 7; + {ack, _Txn, _MsgIds, _ChPid} -> 7; + {reject, _MsgIds, _Requeue, _ChPid} -> 7; + {notify_sent, _ChPid} -> 7; + {unblock, _ChPid} -> 7; + {run_backing_queue, _Fun} -> 6; + sync_timeout -> 6; + _ -> 0 end. prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, @@ -991,12 +990,12 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> noreply(requeue_and_run(AckTags, State)) end; -handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> - reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). +handle_call({run_backing_queue, Fun}, _From, State) -> + reply(ok, run_backing_queue(Fun, State)). -handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> - noreply(maybe_run_queue_via_backing_queue(Fun, State)); +handle_cast({run_backing_queue, Fun}, State) -> + noreply(run_backing_queue(Fun, State)); handle_cast(sync_timeout, State) -> noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined})); |