diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-03-05 00:31:49 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-03-05 00:31:49 +0000 |
commit | 40d08e7806c1980d428cd3065f71faa08e7239a9 (patch) | |
tree | 3841f92f54718652331fab0f91e1aa37597c72e6 | |
parent | bac67caafb6c00f2141a1da98d29c29dfb6bf8d9 (diff) | |
download | rabbitmq-server-40d08e7806c1980d428cd3065f71faa08e7239a9.tar.gz |
make handling of confirms more obvious in BQ API
and fix some bugs introduced earlier
...amazingly it all seems to work now
-rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 1 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 31 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 51 |
4 files changed, 49 insertions, 38 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 2e4d1b0a..b2bf6bbb 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -43,6 +43,7 @@ (false, rabbit_types:basic_message(), rabbit_types:message_properties(), state()) -> {undefined, state()}). +-spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}). -spec(dropwhile/2 :: (fun ((rabbit_types:message_properties()) -> boolean()), state()) -> state()). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 069b803e..4d8b936a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -215,13 +215,15 @@ noreply(NewState) -> {NewState1, Timeout} = next_state(NewState), {noreply, NewState1, Timeout}. -next_state(State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - ensure_rate_timer(State), - State2 = ensure_stats_timer(State1), - case BQ:needs_idle_timeout(BQS) of - true -> {ensure_sync_timer(State2), 0}; - false -> {stop_sync_timer(State2), hibernate} +next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + {Guids, BQS1} = BQ:drain_confirmed(BQS), + BQNeedsSync = BQ:needs_idle_timeout(BQS1), + State1 = ensure_stats_timer( + ensure_rate_timer( + confirm_messages(Guids, State#q{backing_queue_state = BQS1}))), + case BQNeedsSync of + true -> {ensure_sync_timer(State1), 0}; + false -> {stop_sync_timer(State1), hibernate} end. ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> @@ -418,6 +420,8 @@ deliver_from_queue_deliver(AckRequired, false, State) -> fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. +confirm_messages([], State) -> + State; confirm_messages(Guids, State = #q{guid_to_channel = GTC}) -> {CMs, GTC1} = lists:foldl( @@ -523,9 +527,8 @@ deliver_or_enqueue(Delivery, State) -> requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> maybe_run_queue_via_backing_queue( - fun (BQS) -> - {[], BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)} - end, State). + fun (BQS) -> BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) end, + State). fetch(AckRequired, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -628,13 +631,11 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. backing_queue_idle_timeout(State = #q{backing_queue = BQ}) -> - maybe_run_queue_via_backing_queue( - fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State). + maybe_run_queue_via_backing_queue(fun (BQS) -> BQ:idle_timeout(BQS) end, + State). maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - {Guids, BQS1} = Fun(BQS), - run_message_queue( - confirm_messages(Guids, State#q{backing_queue_state = BQS1})). + run_message_queue(State#q{backing_queue_state = Fun(BQS)}). commit_transaction(Txn, From, C = #cr{acktags = ChAckTags}, State = #q{backing_queue = BQ, diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index a8e201ea..b06f1e9c 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -54,6 +54,10 @@ behaviour_info(callbacks) -> %% (i.e. saves the round trip through the backing queue). {publish_delivered, 4}, + %% Return ids of messages which have been confirmed since + %% the last invocation of this function (or initialisation). + {drain_confirmed, 1}, + %% Drop messages from the head of the queue while the supplied %% predicate returns true. {dropwhile, 2}, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 67c4cc3c..eca3d8d3 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -17,8 +17,8 @@ -module(rabbit_variable_queue). -export([init/5, terminate/1, delete_and_terminate/1, - purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, - tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, + purge/1, publish/3, publish_delivered/4, drain_confirmed/1, + fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, @@ -255,6 +255,7 @@ msgs_on_disk, msg_indices_on_disk, unconfirmed, + confirmed, ack_out_counter, ack_in_counter, ack_rates @@ -353,6 +354,7 @@ msgs_on_disk :: gb_set(), msg_indices_on_disk :: gb_set(), unconfirmed :: gb_set(), + confirmed :: gb_set(), ack_out_counter :: non_neg_integer(), ack_in_counter :: non_neg_integer(), ack_rates :: rates() }). @@ -443,8 +445,8 @@ init(QueueName, true, true, AsyncCallback, SyncCallback, rabbit_msg_store:contains(Guid, PersistentClient) end, MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, Terms1, - PersistentClient, TransientClient, AsyncCallback, SyncCallback). + init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback, + PersistentClient, TransientClient). terminate(State) -> State1 = #vqstate { persistent_count = PCount, @@ -549,6 +551,9 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, persistent_count = PCount1, unconfirmed = UC1 }))}. +drain_confirmed(State = #vqstate { confirmed = C }) -> + {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}. + dropwhile(Pred, State) -> {_OkOrEmpty, State1} = dropwhile1(Pred, State), State1. @@ -981,7 +986,7 @@ msg_store_close_fds_fun(IsPersistent, Callback) -> fun (State = #vqstate { msg_store_clients = MSCState }) -> {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent), - {[], State #vqstate { msg_store_clients = MSCState1 }} + State #vqstate { msg_store_clients = MSCState1 } end) end. @@ -1068,7 +1073,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> %%---------------------------------------------------------------------------- init(IsDurable, IndexState, DeltaCount, Terms, - PersistentClient, TransientClient, AsyncCallback, SyncCallback) -> + AsyncCallback, SyncCallback, PersistentClient, TransientClient) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount), @@ -1111,6 +1116,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, msgs_on_disk = gb_sets:new(), msg_indices_on_disk = gb_sets:new(), unconfirmed = gb_sets:new(), + confirmed = gb_sets:new(), ack_out_counter = 0, ack_in_counter = 0, ack_rates = blank_rate(Now, 0) }, @@ -1427,12 +1433,14 @@ confirm_commit_index(State = #vqstate { index_state = IndexState }) -> false -> State end. -remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, +record_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> + unconfirmed = UC, + confirmed = C }) -> State #vqstate { msgs_on_disk = gb_sets:difference(MOD, GuidSet), msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet), - unconfirmed = gb_sets:difference(UC, GuidSet) }. + unconfirmed = gb_sets:difference(UC, GuidSet), + confirmed = gb_sets:union (C, GuidSet) }. needs_index_sync(#vqstate { msg_indices_on_disk = MIOD, unconfirmed = UC }) -> @@ -1449,11 +1457,8 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD, %% subtraction. not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)). -msgs_confirmed(GuidSet, State) -> - {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}. - blind_confirm(Callback, GuidSet) -> - Callback(fun (State) -> msgs_confirmed(GuidSet, State) end). + Callback(fun (State) -> record_confirms(GuidSet, State) end). msgs_written_to_disk(Callback, GuidSet, removed) -> blind_confirm(Callback, GuidSet); @@ -1461,22 +1466,22 @@ msgs_written_to_disk(Callback, GuidSet, written) -> Callback(fun (State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, unconfirmed = UC }) -> - msgs_confirmed(gb_sets:intersection(GuidSet, MIOD), - State #vqstate { - msgs_on_disk = - gb_sets:union( - MOD, gb_sets:intersection(UC, GuidSet)) }) + record_confirms(gb_sets:intersection(GuidSet, MIOD), + State #vqstate { + msgs_on_disk = + gb_sets:union( + MOD, gb_sets:intersection(UC, GuidSet)) }) end). msg_indices_written_to_disk(Callback, GuidSet) -> Callback(fun (State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, unconfirmed = UC }) -> - msgs_confirmed(gb_sets:intersection(GuidSet, MOD), - State #vqstate { - msg_indices_on_disk = - gb_sets:union( - MIOD, gb_sets:intersection(UC, GuidSet)) }) + record_confirms(gb_sets:intersection(GuidSet, MOD), + State #vqstate { + msg_indices_on_disk = + gb_sets:union( + MIOD, gb_sets:intersection(UC, GuidSet)) }) end). %%---------------------------------------------------------------------------- |