summaryrefslogtreecommitdiff
path: root/src/rabbit_variable_queue.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-03-05 00:31:49 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2011-03-05 00:31:49 +0000
commit40d08e7806c1980d428cd3065f71faa08e7239a9 (patch)
tree3841f92f54718652331fab0f91e1aa37597c72e6 /src/rabbit_variable_queue.erl
parentbac67caafb6c00f2141a1da98d29c29dfb6bf8d9 (diff)
downloadrabbitmq-server-40d08e7806c1980d428cd3065f71faa08e7239a9.tar.gz
make handling of confirms more obvious in BQ API
and fix some bugs introduced earlier ...amazingly it all seems to work now
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r--src/rabbit_variable_queue.erl51
1 files changed, 28 insertions, 23 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 67c4cc3c..eca3d8d3 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -17,8 +17,8 @@
-module(rabbit_variable_queue).
-export([init/5, terminate/1, delete_and_terminate/1,
- purge/1, publish/3, publish_delivered/4, fetch/2, ack/2,
- tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
+ purge/1, publish/3, publish_delivered/4, drain_confirmed/1,
+ fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4,
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
@@ -255,6 +255,7 @@
msgs_on_disk,
msg_indices_on_disk,
unconfirmed,
+ confirmed,
ack_out_counter,
ack_in_counter,
ack_rates
@@ -353,6 +354,7 @@
msgs_on_disk :: gb_set(),
msg_indices_on_disk :: gb_set(),
unconfirmed :: gb_set(),
+ confirmed :: gb_set(),
ack_out_counter :: non_neg_integer(),
ack_in_counter :: non_neg_integer(),
ack_rates :: rates() }).
@@ -443,8 +445,8 @@ init(QueueName, true, true, AsyncCallback, SyncCallback,
rabbit_msg_store:contains(Guid, PersistentClient)
end,
MsgIdxOnDiskFun),
- init(true, IndexState, DeltaCount, Terms1,
- PersistentClient, TransientClient, AsyncCallback, SyncCallback).
+ init(true, IndexState, DeltaCount, Terms1, AsyncCallback, SyncCallback,
+ PersistentClient, TransientClient).
terminate(State) ->
State1 = #vqstate { persistent_count = PCount,
@@ -549,6 +551,9 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent,
persistent_count = PCount1,
unconfirmed = UC1 }))}.
+drain_confirmed(State = #vqstate { confirmed = C }) ->
+ {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}.
+
dropwhile(Pred, State) ->
{_OkOrEmpty, State1} = dropwhile1(Pred, State),
State1.
@@ -981,7 +986,7 @@ msg_store_close_fds_fun(IsPersistent, Callback) ->
fun (State = #vqstate { msg_store_clients = MSCState }) ->
{ok, MSCState1} =
msg_store_close_fds(MSCState, IsPersistent),
- {[], State #vqstate { msg_store_clients = MSCState1 }}
+ State #vqstate { msg_store_clients = MSCState1 }
end)
end.
@@ -1068,7 +1073,7 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
%%----------------------------------------------------------------------------
init(IsDurable, IndexState, DeltaCount, Terms,
- PersistentClient, TransientClient, AsyncCallback, SyncCallback) ->
+ AsyncCallback, SyncCallback, PersistentClient, TransientClient) ->
{LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
DeltaCount1 = proplists:get_value(persistent_count, Terms, DeltaCount),
@@ -1111,6 +1116,7 @@ init(IsDurable, IndexState, DeltaCount, Terms,
msgs_on_disk = gb_sets:new(),
msg_indices_on_disk = gb_sets:new(),
unconfirmed = gb_sets:new(),
+ confirmed = gb_sets:new(),
ack_out_counter = 0,
ack_in_counter = 0,
ack_rates = blank_rate(Now, 0) },
@@ -1427,12 +1433,14 @@ confirm_commit_index(State = #vqstate { index_state = IndexState }) ->
false -> State
end.
-remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
+record_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
- unconfirmed = UC }) ->
+ unconfirmed = UC,
+ confirmed = C }) ->
State #vqstate { msgs_on_disk = gb_sets:difference(MOD, GuidSet),
msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet),
- unconfirmed = gb_sets:difference(UC, GuidSet) }.
+ unconfirmed = gb_sets:difference(UC, GuidSet),
+ confirmed = gb_sets:union (C, GuidSet) }.
needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
@@ -1449,11 +1457,8 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD,
%% subtraction.
not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)).
-msgs_confirmed(GuidSet, State) ->
- {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}.
-
blind_confirm(Callback, GuidSet) ->
- Callback(fun (State) -> msgs_confirmed(GuidSet, State) end).
+ Callback(fun (State) -> record_confirms(GuidSet, State) end).
msgs_written_to_disk(Callback, GuidSet, removed) ->
blind_confirm(Callback, GuidSet);
@@ -1461,22 +1466,22 @@ msgs_written_to_disk(Callback, GuidSet, written) ->
Callback(fun (State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
- msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
- State #vqstate {
- msgs_on_disk =
- gb_sets:union(
- MOD, gb_sets:intersection(UC, GuidSet)) })
+ record_confirms(gb_sets:intersection(GuidSet, MIOD),
+ State #vqstate {
+ msgs_on_disk =
+ gb_sets:union(
+ MOD, gb_sets:intersection(UC, GuidSet)) })
end).
msg_indices_written_to_disk(Callback, GuidSet) ->
Callback(fun (State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
- msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
- State #vqstate {
- msg_indices_on_disk =
- gb_sets:union(
- MIOD, gb_sets:intersection(UC, GuidSet)) })
+ record_confirms(gb_sets:intersection(GuidSet, MOD),
+ State #vqstate {
+ msg_indices_on_disk =
+ gb_sets:union(
+ MIOD, gb_sets:intersection(UC, GuidSet)) })
end).
%%----------------------------------------------------------------------------