summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-03-05 00:31:49 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-03-05 00:31:49 +0000
commit40d08e7806c1980d428cd3065f71faa08e7239a9 (patch)
tree3841f92f54718652331fab0f91e1aa37597c72e6
parentbac67caafb6c00f2141a1da98d29c29dfb6bf8d9 (diff)
downloadrabbitmq-server-40d08e7806c1980d428cd3065f71faa08e7239a9.tar.gz
make handling of confirms more obvious in BQ API
and fix some bugs introduced earlier ...amazingly it all seems to work now
-rw-r--r--include/rabbit_backing_queue_spec.hrl1
-rw-r--r--src/rabbit_amqqueue_process.erl31
-rw-r--r--src/rabbit_backing_queue.erl4
-rw-r--r--src/rabbit_variable_queue.erl51
4 files changed, 49 insertions, 38 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 2e4d1b0a..b2bf6bbb 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -43,6 +43,7 @@
(false, rabbit_types:basic_message(),
rabbit_types:message_properties(), state())
-> {undefined, state()}).
+-spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}).
-spec(dropwhile/2 ::
(fun ((rabbit_types:message_properties()) -> boolean()), state())
-> state()).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 069b803e..4d8b936a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -215,13 +215,15 @@ noreply(NewState) ->
{NewState1, Timeout} = next_state(NewState),
{noreply, NewState1, Timeout}.
-next_state(State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- ensure_rate_timer(State),
- State2 = ensure_stats_timer(State1),
- case BQ:needs_idle_timeout(BQS) of
- true -> {ensure_sync_timer(State2), 0};
- false -> {stop_sync_timer(State2), hibernate}
+next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ {Guids, BQS1} = BQ:drain_confirmed(BQS),
+ BQNeedsSync = BQ:needs_idle_timeout(BQS1),
+ State1 = ensure_stats_timer(
+ ensure_rate_timer(
+ confirm_messages(Guids, State#q{backing_queue_state = BQS1}))),
+ case BQNeedsSync of
+ true -> {ensure_sync_timer(State1), 0};
+ false -> {stop_sync_timer(State1), hibernate}
end.
ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
@@ -418,6 +420,8 @@ deliver_from_queue_deliver(AckRequired, false, State) ->
fetch(AckRequired, State),
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
+confirm_messages([], State) ->
+ State;
confirm_messages(Guids, State = #q{guid_to_channel = GTC}) ->
{CMs, GTC1} =
lists:foldl(
@@ -523,9 +527,8 @@ deliver_or_enqueue(Delivery, State) ->
requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
maybe_run_queue_via_backing_queue(
- fun (BQS) ->
- {[], BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)}
- end, State).
+ fun (BQS) -> BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) end,
+ State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
@@ -628,13 +631,11 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
qname(#q{q = #amqqueue{name = QName}}) -> QName.
backing_queue_idle_timeout(State = #q{backing_queue = BQ}) ->
- maybe_run_queue_via_backing_queue(
- fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State).
+ maybe_run_queue_via_backing_queue(fun (BQS) -> BQ:idle_timeout(BQS) end,
+ State).
maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
- {Guids, BQS1} = Fun(BQS),
- run_message_queue(
- confirm_messages(Guids, State#q{backing_queue_state = BQS1})).
+ run_message_queue(State#q{backing_queue_state = Fun(BQS)}).
commit_transaction(Txn, From, C = #cr{acktags = ChAckTags},
State = #q{backing_queue = BQ,
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index a8e201ea..b06f1e9c 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -54,6 +54,10 @@ behaviour_info(callbacks) ->
%% (i.e. saves the round trip through the backing queue).
{publish_delivered, 4},
+ %% Return ids of messages which have been confirmed since
+ %% the last invocation of this function (or initialisation).
+ {drain_confirmed, 1},
+
%% Drop messages from the head of the queue while the supplied
%% predicate returns true.
{dropwhile, 2},
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 67c4cc3c..eca3d8d3 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -17,8 +17,8 @@
-module(rabbit_variable_queue).
-export([init/5, terminate/1, delete_and_terminate/1,
- purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
- tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
+ purge/1, publish/3, publish_delivered/4, drain_confirmed/1,
+ fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
@@ -255,6 +255,7 @@
msgs_on_disk,
msg_indices_on_disk,
unconfirmed,
+ confirmed,
ack_out_counter,
ack_in_counter,
ack_rates
@@ -353,6 +354,7 @@
msgs_on_disk :: gb_set(),
msg_indices_on_disk :: gb_set(),
unconfirmed :: gb_set(),
+ confirmed :: gb_set(),
ack_out_counter :: non_neg_integer(),
ack_in_counter :: non_neg_integer(),
ack_rates :: rates() }).
@@ -443,8 +445,8 @@ init(QueueName, true, true, AsyncCallback, SyncCallback,
rabbit_msg_store:contains(Guid, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, Terms1,
- PersistentClient, TransientClient, AsyncCallback, SyncCallback).
+ init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback,
+ PersistentClient, TransientClient).
terminate(State) ->
State1 = #vqstate { persistent_count = PCount,
@@ -549,6 +551,9 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
persistent_count = PCount1,
unconfirmed = UC1 }))}.
+drain_confirmed(State = #vqstate { confirmed = C }) ->
+ {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}.
+
dropwhile(Pred, State) ->
{_OkOrEmpty, State1} = dropwhile1(Pred, State),
State1.
@@ -981,7 +986,7 @@ msg_store_close_fds_fun(IsPersistent, Callback) ->
fun (State = #vqstate { msg_store_clients = MSCState }) ->
{ok, MSCState1} =
msg_store_close_fds(MSCState, IsPersistent),
- {[], State #vqstate { msg_store_clients = MSCState1 }}
+ State #vqstate { msg_store_clients = MSCState1 }
end)
end.
@@ -1068,7 +1073,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
%%----------------------------------------------------------------------------
init(IsDurable, IndexState, DeltaCount, Terms,
- PersistentClient, TransientClient, AsyncCallback, SyncCallback) ->
+ AsyncCallback, SyncCallback, PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
@@ -1111,6 +1116,7 @@ init(IsDurable, IndexState, DeltaCount, Terms,
msgs_on_disk = gb_sets:new(),
msg_indices_on_disk = gb_sets:new(),
unconfirmed = gb_sets:new(),
+ confirmed = gb_sets:new(),
ack_out_counter = 0,
ack_in_counter = 0,
ack_rates = blank_rate(Now, 0) },
@@ -1427,12 +1433,14 @@ confirm_commit_index(State = #vqstate { index_state = IndexState }) ->
false -> State
end.
-remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
+record_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
+ unconfirmed = UC,
+ confirmed = C }) ->
State #vqstate { msgs_on_disk = gb_sets:difference(MOD, GuidSet),
msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet),
- unconfirmed = gb_sets:difference(UC, GuidSet) }.
+ unconfirmed = gb_sets:difference(UC, GuidSet),
+ confirmed = gb_sets:union (C, GuidSet) }.
needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
@@ -1449,11 +1457,8 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
%% subtraction.
not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
-msgs_confirmed(GuidSet, State) ->
- {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}.
-
blind_confirm(Callback, GuidSet) ->
- Callback(fun (State) -> msgs_confirmed(GuidSet, State) end).
+ Callback(fun (State) -> record_confirms(GuidSet, State) end).
msgs_written_to_disk(Callback, GuidSet, removed) ->
blind_confirm(Callback, GuidSet);
@@ -1461,22 +1466,22 @@ msgs_written_to_disk(Callback, GuidSet, written) ->
Callback(fun (State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
- msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
- State #vqstate {
- msgs_on_disk =
- gb_sets:union(
- MOD, gb_sets:intersection(UC, GuidSet)) })
+ record_confirms(gb_sets:intersection(GuidSet, MIOD),
+ State #vqstate {
+ msgs_on_disk =
+ gb_sets:union(
+ MOD, gb_sets:intersection(UC, GuidSet)) })
end).
msg_indices_written_to_disk(Callback, GuidSet) ->
Callback(fun (State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
- msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
- State #vqstate {
- msg_indices_on_disk =
- gb_sets:union(
- MIOD, gb_sets:intersection(UC, GuidSet)) })
+ record_confirms(gb_sets:intersection(GuidSet, MOD),
+ State #vqstate {
+ msg_indices_on_disk =
+ gb_sets:union(
+ MIOD, gb_sets:intersection(UC, GuidSet)) })
end).
%%----------------------------------------------------------------------------