diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-03-05 00:31:49 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-03-05 00:31:49 +0000 |
commit | 40d08e7806c1980d428cd3065f71faa08e7239a9 (patch) | |
tree | 3841f92f54718652331fab0f91e1aa37597c72e6 /src/rabbit_variable_queue.erl | |
parent | bac67caafb6c00f2141a1da98d29c29dfb6bf8d9 (diff) | |
download | rabbitmq-server-40d08e7806c1980d428cd3065f71faa08e7239a9.tar.gz |
make handling of confirms more obvious in BQ API
and fix some bugs introduced earlier
...amazingly it all seems to work now
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r-- | src/rabbit_variable_queue.erl | 51 |
1 files changed, 28 insertions, 23 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 67c4cc3c..eca3d8d3 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -17,8 +17,8 @@ -module(rabbit_variable_queue). -export([init/5, terminate/1, delete_and_terminate/1, - purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, - tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, + purge/1, publish/3, publish_delivered/4, drain_confirmed/1, + fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, @@ -255,6 +255,7 @@ msgs_on_disk, msg_indices_on_disk, unconfirmed, + confirmed, ack_out_counter, ack_in_counter, ack_rates @@ -353,6 +354,7 @@ msgs_on_disk :: gb_set(), msg_indices_on_disk :: gb_set(), unconfirmed :: gb_set(), + confirmed :: gb_set(), ack_out_counter :: non_neg_integer(), ack_in_counter :: non_neg_integer(), ack_rates :: rates() }). @@ -443,8 +445,8 @@ init(QueueName, true, true, AsyncCallback, SyncCallback, rabbit_msg_store:contains(Guid, PersistentClient) end, MsgIdxOnDiskFun), - init(true, IndexState, DeltaCount, Terms1, - PersistentClient, TransientClient, AsyncCallback, SyncCallback). + init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback, + PersistentClient, TransientClient). terminate(State) -> State1 = #vqstate { persistent_count = PCount, @@ -549,6 +551,9 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, persistent_count = PCount1, unconfirmed = UC1 }))}. +drain_confirmed(State = #vqstate { confirmed = C }) -> + {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}. + dropwhile(Pred, State) -> {_OkOrEmpty, State1} = dropwhile1(Pred, State), State1. @@ -981,7 +986,7 @@ msg_store_close_fds_fun(IsPersistent, Callback) -> fun (State = #vqstate { msg_store_clients = MSCState }) -> {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent), - {[], State #vqstate { msg_store_clients = MSCState1 }} + State #vqstate { msg_store_clients = MSCState1 } end) end. @@ -1068,7 +1073,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> %%---------------------------------------------------------------------------- init(IsDurable, IndexState, DeltaCount, Terms, - PersistentClient, TransientClient, AsyncCallback, SyncCallback) -> + AsyncCallback, SyncCallback, PersistentClient, TransientClient) -> {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount), @@ -1111,6 +1116,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, msgs_on_disk = gb_sets:new(), msg_indices_on_disk = gb_sets:new(), unconfirmed = gb_sets:new(), + confirmed = gb_sets:new(), ack_out_counter = 0, ack_in_counter = 0, ack_rates = blank_rate(Now, 0) }, @@ -1427,12 +1433,14 @@ confirm_commit_index(State = #vqstate { index_state = IndexState }) -> false -> State end. -remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, +record_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> + unconfirmed = UC, + confirmed = C }) -> State #vqstate { msgs_on_disk = gb_sets:difference(MOD, GuidSet), msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet), - unconfirmed = gb_sets:difference(UC, GuidSet) }. + unconfirmed = gb_sets:difference(UC, GuidSet), + confirmed = gb_sets:union (C, GuidSet) }. needs_index_sync(#vqstate { msg_indices_on_disk = MIOD, unconfirmed = UC }) -> @@ -1449,11 +1457,8 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD, %% subtraction. not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)). -msgs_confirmed(GuidSet, State) -> - {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}. - blind_confirm(Callback, GuidSet) -> - Callback(fun (State) -> msgs_confirmed(GuidSet, State) end). + Callback(fun (State) -> record_confirms(GuidSet, State) end). msgs_written_to_disk(Callback, GuidSet, removed) -> blind_confirm(Callback, GuidSet); @@ -1461,22 +1466,22 @@ msgs_written_to_disk(Callback, GuidSet, written) -> Callback(fun (State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, unconfirmed = UC }) -> - msgs_confirmed(gb_sets:intersection(GuidSet, MIOD), - State #vqstate { - msgs_on_disk = - gb_sets:union( - MOD, gb_sets:intersection(UC, GuidSet)) }) + record_confirms(gb_sets:intersection(GuidSet, MIOD), + State #vqstate { + msgs_on_disk = + gb_sets:union( + MOD, gb_sets:intersection(UC, GuidSet)) }) end). msg_indices_written_to_disk(Callback, GuidSet) -> Callback(fun (State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, unconfirmed = UC }) -> - msgs_confirmed(gb_sets:intersection(GuidSet, MOD), - State #vqstate { - msg_indices_on_disk = - gb_sets:union( - MIOD, gb_sets:intersection(UC, GuidSet)) }) + record_confirms(gb_sets:intersection(GuidSet, MOD), + State #vqstate { + msg_indices_on_disk = + gb_sets:union( + MIOD, gb_sets:intersection(UC, GuidSet)) }) end). %%---------------------------------------------------------------------------- |