summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-04 16:22:17 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-10-04 16:22:17 +0100
commit0f8d9a753491fbd52393f398765a31394cc0a731 (patch)
treee2db52fa49ffb9b3677eb4b1fe8aae47f7d964b2
parentb1b6201cd23233b9aa9b727d43c386819007155f (diff)
downloadrabbitmq-server-0f8d9a753491fbd52393f398765a31394cc0a731.tar.gz
refactor + add async version of maybe_run_queue_via_backing_queue/2
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl3
-rw-r--r--src/rabbit_variable_queue.erl67
3 files changed, 40 insertions, 36 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c7d63d08..e1dc73e1 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -34,6 +34,7 @@
-export([start/0, stop/0, declare/5, delete_exclusive/1, delete/3, purge/1]).
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
+ maybe_run_queue_via_backing_queue_async/2,
update_ram_duration/1, set_ram_duration_target/2,
set_maximum_since_use/2, maybe_expire/1]).
-export([pseudo_queue/2]).
@@ -159,6 +160,8 @@
rabbit_types:connection_exit()).
-spec(maybe_run_queue_via_backing_queue/2 ::
(pid(), (fun ((A) -> A))) -> 'ok').
+-spec(maybe_run_queue_via_backing_queue_async/2 ::
+ (pid(), (fun ((A) -> A))) -> 'ok').
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
@@ -457,6 +460,9 @@ internal_delete(QueueName) ->
maybe_run_queue_via_backing_queue(QPid, Fun) ->
gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity).
+maybe_run_queue_via_backing_queue_async(QPid, Fun) ->
+ gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}).
+
update_ram_duration(QPid) ->
gen_server2:cast(QPid, update_ram_duration).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ee74b3d2..d0d971ac 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -878,6 +878,9 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
+handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) ->
+ noreply(maybe_run_queue_via_backing_queue(Fun, State));
+
handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
{_Delivered, NewState} = deliver_or_enqueue(Delivery, State),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f29cf5f1..7d84c840 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -541,10 +541,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
PA1 = record_pending_ack(m(MsgStatus1), PA),
PCount1 = PCount + one_if(IsPersistent1),
- Unconfirmed1 = case NeedsConfirming of
- true -> gb_sets:insert(Guid, Unconfirmed);
- false -> Unconfirmed
- end,
+ Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed),
{SeqId, a(State1 #vqstate {
next_seq_id = SeqId + 1,
out_counter = OutCount + 1,
@@ -823,6 +820,9 @@ one_if(false) -> 0.
cons_if(true, E, L) -> [E | L];
cons_if(false, _E, L) -> L.
+gb_sets_maybe_insert(false, _Val, Set) -> Set;
+gb_sets_maybe_insert(true, Val, Set) -> gb_sets:insert(Val, Set).
+
msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }) ->
#msg_status { seq_id = SeqId, guid = Guid, msg = Msg,
is_persistent = IsPersistent, is_delivered = false,
@@ -1066,10 +1066,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent,
true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
end,
PCount1 = PCount + one_if(IsPersistent1),
- Unconfirmed1 = case NeedsConfirming of
- true -> gb_sets:insert(Guid, Unconfirmed);
- false -> Unconfirmed
- end,
+ Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed),
{SeqId, State2 #vqstate {
next_seq_id = SeqId + 1,
len = Len + 1,
@@ -1212,36 +1209,34 @@ msgs_confirmed(GuidSet, State) ->
{remove_confirms(GuidSet, State), {confirm, gb_sets:to_list(GuidSet)}}.
msgs_written_to_disk(QPid, Guids) ->
- spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- QPid,
- fun(State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- GuidSet = gb_sets:from_list(Guids),
- msgs_confirmed(
- gb_sets:intersection(GuidSet, MIOD),
- State #vqstate {
- msgs_on_disk =
- gb_sets:intersection(
- gb_sets:union(MOD, GuidSet), UC) })
- end)
- end).
+ rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
+ QPid,
+ fun(State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ GuidSet = gb_sets:from_list(Guids),
+ msgs_confirmed(
+ gb_sets:intersection(GuidSet, MIOD),
+ State #vqstate {
+ msgs_on_disk =
+ gb_sets:intersection(
+ gb_sets:union(MOD, GuidSet), UC) })
+ end).
msg_indices_written_to_disk(QPid, Guids) ->
- spawn(fun() -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
- QPid,
- fun(State = #vqstate { msgs_on_disk = MOD,
- msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
- GuidSet = gb_sets:from_list(Guids),
- msgs_confirmed(
- gb_sets:intersection(GuidSet, MOD),
- State #vqstate {
- msg_indices_on_disk =
- gb_sets:intersection(
- gb_sets:union(MIOD, GuidSet), UC) })
- end)
- end).
+ rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
+ QPid,
+ fun(State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ GuidSet = gb_sets:from_list(Guids),
+ msgs_confirmed(
+ gb_sets:intersection(GuidSet, MOD),
+ State #vqstate {
+ msg_indices_on_disk =
+ gb_sets:intersection(
+ gb_sets:union(MIOD, GuidSet), UC) })
+ end).
%%----------------------------------------------------------------------------