summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/gm.erl16
1 files changed, 10 insertions, 6 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 01300f18..82196c73 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -533,7 +533,7 @@ init([GroupName, Module, Args]) ->
group_name = GroupName,
module = Module,
view = undefined,
- pub_count = 0,
+ pub_count = -1,
members_state = undefined,
callback_args = Args,
confirms = queue:new(),
@@ -829,13 +829,14 @@ internal_broadcast(Msg, From, State = #state { self = Self,
confirms = Confirms,
callback_args = Args,
broadcast_buffer = Buffer }) ->
+ PubCount1 = PubCount + 1,
Result = Module:handle_msg(Args, get_pid(Self), Msg),
- Buffer1 = [{PubCount, Msg} | Buffer],
+ Buffer1 = [{PubCount1, Msg} | Buffer],
Confirms1 = case From of
none -> Confirms;
- _ -> queue:in({PubCount, From}, Confirms)
+ _ -> queue:in({PubCount1, From}, Confirms)
end,
- State1 = State #state { pub_count = PubCount + 1,
+ State1 = State #state { pub_count = PubCount1,
confirms = Confirms1,
broadcast_buffer = Buffer1 },
case From =/= none of
@@ -850,14 +851,17 @@ flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
State;
flush_broadcast_buffer(State = #state { self = Self,
members_state = MembersState,
- broadcast_buffer = Buffer }) ->
+ broadcast_buffer = Buffer,
+ pub_count = PubCount }) ->
+ [{PubCount, _Msg}|_] = Buffer,
Pubs = lists:reverse(Buffer),
Activity = activity_cons(Self, Pubs, [], activity_nil()),
ok = maybe_send_activity(activity_finalise(Activity), State),
MembersState1 = with_member(
fun (Member = #member { pending_ack = PA }) ->
PA1 = queue:join(PA, queue:from_list(Pubs)),
- Member #member { pending_ack = PA1 }
+ Member #member { pending_ack = PA1,
+ last_pub = PubCount }
end, Self, MembersState),
State #state { members_state = MembersState1,
broadcast_buffer = [] }.