summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_sync.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-19 21:32:01 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-19 21:32:01 +0000
commitf27c502034c9e5218e280c4a39da88562b466f51 (patch)
tree59b17d795623020fc51080882dd0331f4af0ea20 /src/rabbit_mirror_queue_sync.erl
parent005788d47882dade23b7c3b605bcafde4107222d (diff)
downloadrabbitmq-server-f27c502034c9e5218e280c4a39da88562b466f51.tar.gz
populate slave's msg_id_ack with sync'ed messages pending ackbug25394
Diffstat (limited to 'src/rabbit_mirror_queue_sync.erl')
-rw-r--r--src/rabbit_mirror_queue_sync.erl45
1 files changed, 24 insertions, 21 deletions
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index b023823e..b8cfe4a9 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -57,6 +57,9 @@
-type(log_fun() :: fun ((string(), [any()]) -> 'ok')).
-type(bq() :: atom()).
-type(bqs() :: any()).
+-type(ack() :: any()).
+-type(slave_sync_state() :: {[{rabbit_types:msg_id(), ack()}], timer:tref(),
+ bqs()}).
-spec(master_prepare/3 :: (reference(), log_fun(), [pid()]) -> pid()).
-spec(master_go/7 :: (pid(), reference(), log_fun(),
@@ -69,8 +72,8 @@
-spec(slave/7 :: (non_neg_integer(), reference(), timer:tref(), pid(),
bq(), bqs(), fun((bq(), bqs()) -> {timer:tref(), bqs()})) ->
'denied' |
- {'ok' | 'failed', {timer:tref(), bqs()}} |
- {'stop', any(), {timer:tref(), bqs()}}).
+ {'ok' | 'failed', slave_sync_state()} |
+ {'stop', any(), slave_sync_state()}).
-endif.
@@ -206,10 +209,10 @@ slave(_DD, Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) ->
Syncer ! {sync_ready, Ref, self()},
{_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)),
slave_sync_loop({Ref, MRef, Syncer, BQ, UpdateRamDuration,
- rabbit_misc:get_parent()}, TRef, BQS1).
+ rabbit_misc:get_parent()}, {[], TRef, BQS1}).
slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
- TRef, BQS) ->
+ State = {MA, TRef, BQS}) ->
receive
{'DOWN', MRef, process, Syncer, _Reason} ->
%% If the master dies half way we are not in the usual
@@ -218,40 +221,40 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
%% sync with a newly promoted master, or even just receive
%% messages from it, we have a hole in the middle. So the
%% only thing to do here is purge.
- {_MsgCount, BQS1} = BQ:purge(BQS),
+ {_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)),
credit_flow:peer_down(Syncer),
- {failed, {TRef, BQS1}};
+ {failed, {[], TRef, BQS1}};
{bump_credit, Msg} ->
credit_flow:handle_bump_msg(Msg),
- slave_sync_loop(Args, TRef, BQS);
+ slave_sync_loop(Args, State);
{sync_complete, Ref} ->
erlang:demonitor(MRef, [flush]),
credit_flow:peer_down(Syncer),
- {ok, {TRef, BQS}};
+ {ok, State};
{'$gen_cast', {set_maximum_since_use, Age}} ->
ok = file_handle_cache:set_maximum_since_use(Age),
- slave_sync_loop(Args, TRef, BQS);
+ slave_sync_loop(Args, State);
{'$gen_cast', {set_ram_duration_target, Duration}} ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
- slave_sync_loop(Args, TRef, BQS1);
+ slave_sync_loop(Args, {MA, TRef, BQS1});
update_ram_duration ->
{TRef1, BQS1} = UpdateRamDuration(BQ, BQS),
- slave_sync_loop(Args, TRef1, BQS1);
+ slave_sync_loop(Args, {MA, TRef1, BQS1});
{sync_msg, Ref, Msg, Props, Unacked} ->
credit_flow:ack(Syncer),
Props1 = Props#message_properties{needs_confirming = false},
- BQS1 = case Unacked of
- false -> BQ:publish(Msg, Props1, true, none, BQS);
- true -> {_AckTag, BQS2} = BQ:publish_delivered(
- Msg, Props1, none, BQS),
- %% TODO do something w AckTag
- BQS2
- end,
- slave_sync_loop(Args, TRef, BQS1);
+ {MA1, BQS1} =
+ case Unacked of
+ false -> {MA, BQ:publish(Msg, Props1, true, none, BQS)};
+ true -> {AckTag, BQS2} = BQ:publish_delivered(
+ Msg, Props1, none, BQS),
+ {[{Msg#basic_message.id, AckTag} | MA], BQS2}
+ end,
+ slave_sync_loop(Args, {MA1, TRef, BQS1});
{'EXIT', Parent, Reason} ->
- {stop, Reason, {TRef, BQS}};
+ {stop, Reason, State};
%% If the master throws an exception
{'$gen_cast', {gm, {delete_and_terminate, Reason}}} ->
BQ:delete_and_terminate(Reason, BQS),
- {stop, Reason, {TRef, undefined}}
+ {stop, Reason, {[], TRef, undefined}}
end.