diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-04 16:22:17 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-10-04 16:22:17 +0100 |
commit | 0f8d9a753491fbd52393f398765a31394cc0a731 (patch) | |
tree | e2db52fa49ffb9b3677eb4b1fe8aae47f7d964b2 | |
parent | b1b6201cd23233b9aa9b727d43c386819007155f (diff) | |
download | rabbitmq-server-0f8d9a753491fbd52393f398765a31394cc0a731.tar.gz |
refactor + add async version of maybe_run_queue_via_backing_queue/2
-rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 3 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 67 |
3 files changed, 40 insertions, 36 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c7d63d08..e1dc73e1 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -34,6 +34,7 @@ -export([start/0, stop/0, declare/5, delete_exclusive/1, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, + maybe_run_queue_via_backing_queue_async/2, update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2, maybe_expire/1]). -export([pseudo_queue/2]). @@ -159,6 +160,8 @@ rabbit_types:connection_exit()). -spec(maybe_run_queue_via_backing_queue/2 :: (pid(), (fun ((A) -> A))) -> 'ok'). +-spec(maybe_run_queue_via_backing_queue_async/2 :: + (pid(), (fun ((A) -> A))) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). @@ -457,6 +460,9 @@ internal_delete(QueueName) -> maybe_run_queue_via_backing_queue(QPid, Fun) -> gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity). +maybe_run_queue_via_backing_queue_async(QPid, Fun) -> + gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}). + update_ram_duration(QPid) -> gen_server2:cast(QPid, update_ram_duration). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index ee74b3d2..d0d971ac 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -878,6 +878,9 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). +handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> + noreply(maybe_run_queue_via_backing_queue(Fun, State)); + handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. {_Delivered, NewState} = deliver_or_enqueue(Delivery, State), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f29cf5f1..7d84c840 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -541,10 +541,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), PA1 = record_pending_ack(m(MsgStatus1), PA), PCount1 = PCount + one_if(IsPersistent1), - Unconfirmed1 = case NeedsConfirming of - true -> gb_sets:insert(Guid, Unconfirmed); - false -> Unconfirmed - end, + Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed), {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, out_counter = OutCount + 1, @@ -823,6 +820,9 @@ one_if(false) -> 0. cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. +gb_sets_maybe_insert(false, _Val, Set) -> Set; +gb_sets_maybe_insert(true, Val, Set) -> gb_sets:insert(Val, Set). + msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) -> #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, is_persistent = IsPersistent, is_delivered = false, @@ -1066,10 +1066,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), - Unconfirmed1 = case NeedsConfirming of - true -> gb_sets:insert(Guid, Unconfirmed); - false -> Unconfirmed - end, + Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed), {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, len = Len + 1, @@ -1212,36 +1209,34 @@ msgs_confirmed(GuidSet, State) -> {remove_confirms(GuidSet, State), {confirm, gb_sets:to_list(GuidSet)}}. msgs_written_to_disk(QPid, Guids) -> - spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( - QPid, - fun(State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - GuidSet = gb_sets:from_list(Guids), - msgs_confirmed( - gb_sets:intersection(GuidSet, MIOD), - State #vqstate { - msgs_on_disk = - gb_sets:intersection( - gb_sets:union(MOD, GuidSet), UC) }) - end) - end). + rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + QPid, + fun(State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + GuidSet = gb_sets:from_list(Guids), + msgs_confirmed( + gb_sets:intersection(GuidSet, MIOD), + State #vqstate { + msgs_on_disk = + gb_sets:intersection( + gb_sets:union(MOD, GuidSet), UC) }) + end). msg_indices_written_to_disk(QPid, Guids) -> - spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( - QPid, - fun(State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - GuidSet = gb_sets:from_list(Guids), - msgs_confirmed( - gb_sets:intersection(GuidSet, MOD), - State #vqstate { - msg_indices_on_disk = - gb_sets:intersection( - gb_sets:union(MIOD, GuidSet), UC) }) - end) - end). + rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + QPid, + fun(State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + GuidSet = gb_sets:from_list(Guids), + msgs_confirmed( + gb_sets:intersection(GuidSet, MOD), + State #vqstate { + msg_indices_on_disk = + gb_sets:intersection( + gb_sets:union(MIOD, GuidSet), UC) }) + end). %%---------------------------------------------------------------------------- |