summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-12-14 12:24:21 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2010-12-14 12:24:21 +0000
commit6fc0feb3e036f45a226e2e323cb018758a5e8db7 (patch)
tree5e0d4fb15d24cd8ba3a4f40ace12028a1faf50b2
parent88282eb072f38d71f26f9cb6d663685ef095bfd5 (diff)
downloadrabbitmq-server-6fc0feb3e036f45a226e2e323cb018758a5e8db7.tar.gz
Simplify code surrounding the return from BQ:ack
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_variable_queue.erl56
2 files changed, 29 insertions, 29 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 352e76fd..8603d8d7 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -77,7 +77,7 @@ behaviour_info(callbacks) ->
{fetch, 2},
%% Acktags supplied are for messages which can now be forgotten
- %% about.
+ %% about. Must return 1 guid per Ack, in the same order as Acks.
{ack, 2},
%% A publish, but in the context of a transaction.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 88c61d57..565c61e7 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1311,10 +1311,8 @@ record_pending_ack(#msg_status { seq_id = SeqId,
ack_in_counter = AckInCount}) ->
{AckEntry, RAI1} =
case MsgOnDisk of
- true ->
- {{IsPersistent, Guid, MsgProps}, RAI};
- false ->
- {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)}
+ true -> {{IsPersistent, Guid, MsgProps}, RAI};
+ false -> {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)}
end,
PA1 = dict:store(SeqId, AckEntry, PA),
State #vqstate { pending_ack = PA1,
@@ -1325,8 +1323,8 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3,
- {[], orddict:new()}, PA),
+ {PersistentSeqIds, GuidsByStore, _AllGuids} =
+ dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
case KeepPersistent of
@@ -1336,18 +1334,17 @@ remove_pending_ack(KeepPersistent,
Guids),
State1
end;
- false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
- ok = orddict:fold(
- fun (IsPersistent, Guids, ok) ->
- msg_store_remove(MSCState, IsPersistent, Guids)
- end, ok, GuidsByStore),
+ false -> IndexState1 =
+ rabbit_queue_index:ack(PersistentSeqIds, IndexState),
+ [ok = msg_store_remove(MSCState, IsPersistent, Guids)
+ || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
State1 #vqstate { index_state = IndexState1 }
end.
ack(_MsgStoreFun, _Fun, [], State) ->
{[], State};
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{SeqIds, GuidsByStore},
+ {{PersistentSeqIds, GuidsByStore, AllGuids},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1361,27 +1358,30 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
pending_ack = dict:erase(SeqId, PA),
ram_ack_index =
gb_trees:delete_any(SeqId, RAI)})}
- end, {{[], orddict:new()}, State}, AckTags),
- IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState),
- AckdGuids = lists:append(
- orddict:fold(
- fun (IsPersistent, Guids, Gs) ->
- MsgStoreFun(MSCState, IsPersistent, Guids),
- [Guids | Gs]
- end, [], GuidsByStore)),
+ end, {accumulate_ack_init(), State}, AckTags),
+ IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState),
+ [ok = MsgStoreFun(MSCState, IsPersistent, Guids)
+ || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
- {AckdGuids, State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) }}.
+ {lists:reverse(AllGuids),
+ State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount + length(AckTags) }}.
+
+accumulate_ack_init() -> {[], orddict:new(), []}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
- index_on_disk = false }, Acc) ->
- Acc;
-accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, {SeqIdsAcc, Dict}) ->
- {cons_if(IsPersistent, SeqId, SeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, Guid, Dict)}.
+ index_on_disk = false,
+ guid = Guid },
+ {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) ->
+ {PersistentSeqIdsAcc, GuidsByStore, [Guid | AllGuids]};
+accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps},
+ {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) ->
+ {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
+ rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore),
+ [Guid | AllGuids]}.
find_persistent_count(LensByStore) ->
case orddict:find(true, LensByStore) of