diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-13 18:51:48 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-13 18:51:48 +0000 |
commit | 88282eb072f38d71f26f9cb6d663685ef095bfd5 (patch) | |
tree | ed113af73829acc99fc7d26e7823f6bfea70d9ad /src | |
parent | 01c75632144c5d054bbeb893ac4d0cda2d56b825 (diff) | |
download | rabbitmq-server-88282eb072f38d71f26f9cb6d663685ef095bfd5.tar.gz |
improve consistency of API
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 8 |
3 files changed, 12 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 775c631d..71fd7a17 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -152,9 +152,9 @@ (name()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit()). -spec(maybe_run_queue_via_backing_queue/2 :: - (pid(), (fun ((A) -> A | {any(), A}))) -> 'ok'). + (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). -spec(maybe_run_queue_via_backing_queue_async/2 :: - (pid(), (fun ((A) -> A | {any(), A}))) -> 'ok'). + (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 78bb6835..e559b9c0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -226,7 +226,7 @@ ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) -> {ok, TRef} = timer:apply_after( ?SYNC_INTERVAL, rabbit_amqqueue, maybe_run_queue_via_backing_queue, - [self(), fun (BQS) -> BQ:idle_timeout(BQS) end]), + [self(), fun (BQS) -> {[], BQ:idle_timeout(BQS)} end]), State#q{sync_timer_ref = TRef}; ensure_sync_timer(State) -> State. @@ -520,7 +520,7 @@ deliver_or_enqueue(Delivery, State) -> requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> maybe_run_queue_via_backing_queue( fun (BQS) -> - BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) + {[], BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)} end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, @@ -617,12 +617,9 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - {BQS2, State1} = - case Fun(BQS) of - {{confirm, Guids}, BQS1} -> {BQS1, confirm_messages(Guids, State)}; - BQS1 -> {BQS1, State} - end, - run_message_queue(State1#q{backing_queue_state = BQS2}). + {Guids, BQS1} = Fun(BQS), + run_message_queue( + confirm_messages(Guids, State#q{backing_queue_state = BQS1})). commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS, @@ -1107,7 +1104,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State = #q{backing_queue = BQ}) -> noreply(maybe_run_queue_via_backing_queue( - fun (BQS) -> BQ:idle_timeout(BQS) end, State)); + fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State)); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0799a1b5..88c61d57 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1096,9 +1096,9 @@ blank_rate(Timestamp, IngressLength) -> msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> Self = self(), F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( - Self, fun (StateN) -> tx_commit_post_msg_store( - true, Pubs, AckTags, - Fun, MsgPropsFun, StateN) + Self, fun (StateN) -> {[], tx_commit_post_msg_store( + true, Pubs, AckTags, + Fun, MsgPropsFun, StateN)} end) end, fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( @@ -1401,7 +1401,7 @@ remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, unconfirmed = gb_sets:difference(UC, GuidSet) }. msgs_confirmed(GuidSet, State) -> - {{confirm, gb_sets:to_list(GuidSet)}, remove_confirms(GuidSet, State)}. + {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}. msgs_written_to_disk(QPid, GuidSet) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( |