summaryrefslogtreecommitdiff
path: root/src/rabbit_variable_queue.erl
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-21 18:31:49 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-21 18:31:49 +0000
commitf45fa72f76d72aca74e87347dfd8de6ff3522800 (patch)
tree1464e0d8310c3544ef55a41e0397a970b0321e1f /src/rabbit_variable_queue.erl
parent5647d8a9e4e0d3d8c27cb72941ab3f33dde5048e (diff)
downloadrabbitmq-server-f45fa72f76d72aca74e87347dfd8de6ff3522800.tar.gz
simplify variable_queue:ack's return format
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r--src/rabbit_variable_queue.erl71
1 files changed, 32 insertions, 39 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 6264255b..aa6d3a87 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -654,11 +654,9 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
persistent_count = PCount1 })}.
ack(AckTags, State) ->
- {Guids, State1} =
- ack(fun msg_store_remove/3,
- fun (_, State0) -> State0 end,
- AckTags, State),
- {Guids, a(State1)}.
+ a(ack(fun msg_store_remove/3,
+ fun (_, State0) -> State0 end,
+ AckTags, State)).
tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps,
State = #vqstate { durable = IsDurable,
@@ -708,23 +706,22 @@ tx_commit(Txn, Fun, MsgPropsFun,
end)}.
requeue(AckTags, MsgPropsFun, State) ->
- {_Guids, State1} =
- ack(fun msg_store_release/3,
- fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
- {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps),
- true, false, State1),
- State2;
- ({IsPersistent, Guid, MsgProps}, State1) ->
- #vqstate { msg_store_clients = MSCState } = State1,
- {{ok, Msg = #basic_message{}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, Guid),
- State2 = State1 #vqstate { msg_store_clients = MSCState1 },
- {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps),
- true, true, State2),
- State3
- end,
- AckTags, State),
- a(reduce_memory_use(State1)).
+ a(reduce_memory_use(
+ ack(fun msg_store_release/3,
+ fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
+ {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps),
+ true, false, State1),
+ State2;
+ ({IsPersistent, Guid, MsgProps}, State1) ->
+ #vqstate { msg_store_clients = MSCState } = State1,
+ {{ok, Msg = #basic_message{}}, MSCState1} =
+ msg_store_read(MSCState, IsPersistent, Guid),
+ State2 = State1 #vqstate { msg_store_clients = MSCState1 },
+ {_SeqId, State3} = publish(Msg, MsgPropsFun(MsgProps),
+ true, true, State2),
+ State3
+ end,
+ AckTags, State))).
len(#vqstate { len = Len }) -> Len.
@@ -1156,7 +1153,6 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
durable = IsDurable }) ->
PAcks = lists:append(SPAcks),
Acks = lists:append(SAcks),
- {_Guids, NewState} = ack(Acks, State),
Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs),
{Msg, MsgProps} <- lists:reverse(PubsN)],
{SeqIds, State1 = #vqstate { index_state = IndexState }} =
@@ -1168,7 +1164,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync {
{SeqId, State3} =
publish(Msg, MsgProps, false, IsPersistent1, State2),
{cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3}
- end, {PAcks, NewState}, Pubs),
+ end, {PAcks, ack(Acks, State)}, Pubs),
IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState),
[ Fun() || Fun <- lists:reverse(SFuns) ],
reduce_memory_use(
@@ -1319,7 +1315,7 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {PersistentSeqIds, GuidsByStore, _AllGuids} =
+ {PersistentSeqIds, GuidsByStore} =
dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
@@ -1338,9 +1334,9 @@ remove_pending_ack(KeepPersistent,
end.
ack(_MsgStoreFun, _Fun, [], State) ->
- {[], State};
+ State;
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{PersistentSeqIds, GuidsByStore, AllGuids},
+ {{PersistentSeqIds, GuidsByStore},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1360,24 +1356,21 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
|| {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)],
PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len(
orddict:new(), GuidsByStore)),
- {lists:reverse(AllGuids),
- State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) }}.
+ State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount + length(AckTags) }.
-accumulate_ack_init() -> {[], orddict:new(), []}.
+accumulate_ack_init() -> {[], orddict:new()}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
- index_on_disk = false,
- guid = Guid },
- {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) ->
- {PersistentSeqIdsAcc, GuidsByStore, [Guid | AllGuids]};
+ index_on_disk = false },
+ {PersistentSeqIdsAcc, GuidsByStore}) ->
+ {PersistentSeqIdsAcc, GuidsByStore};
accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps},
- {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) ->
+ {PersistentSeqIdsAcc, GuidsByStore}) ->
{cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore),
- [Guid | AllGuids]}.
+ rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore)}.
find_persistent_count(LensByStore) ->
case orddict:find(true, LensByStore) of