diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-19 20:16:22 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-19 20:16:22 +0000 |
commit | 005788d47882dade23b7c3b605bcafde4107222d (patch) | |
tree | 830904edb41914798109f44e65afbdff315d8f6b | |
parent | db3848bb726db4019c8e595e72109e3daa81ab7e (diff) | |
download | rabbitmq-server-005788d47882dade23b7c3b605bcafde4107222d.tar.gz |
eager sync of messages pending ack
-rw-r--r-- | docs/rabbitmqctl.1.xml | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 12 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 3 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 26 |
5 files changed, 23 insertions, 24 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index c7069aed..bbd2fe5b 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -465,8 +465,7 @@ synchronise itself. The queue will block while synchronisation takes place (all publishers to and consumers from the queue will block). The queue must be - mirrored, and must not have any pending unacknowledged - messages for this command to succeed. + mirrored for this command to succeed. </para> <para> Note that unsynchronised queues from which messages are diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2477b891..21b6bb92 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -174,8 +174,7 @@ (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -spec(start_mirroring/1 :: (pid()) -> 'ok'). -spec(stop_mirroring/1 :: (pid()) -> 'ok'). --spec(sync_mirrors/1 :: (pid()) -> - 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')). +-spec(sync_mirrors/1 :: (pid()) -> 'ok' | rabbit_types:error('not_mirrored')). -spec(cancel_sync_mirrors/1 :: (pid()) -> 'ok' | {'ok', 'not_syncing'}). -endif. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0a07a005..2795e317 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1163,7 +1163,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> noreply(requeue(AckTags, ChPid, State)); handle_call(sync_mirrors, _From, - State = #q{backing_queue = rabbit_mirror_queue_master = BQ, + State = #q{backing_queue = rabbit_mirror_queue_master, backing_queue_state = BQS}) -> S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end, HandleInfo = fun (Status) -> @@ -1179,13 +1179,9 @@ handle_call(sync_mirrors, _From, State, #q.stats_timer, fun() -> emit_stats(State#q{status = Status}) end) end, - case BQ:depth(BQS) - BQ:len(BQS) of - 0 -> case rabbit_mirror_queue_master:sync_mirrors( - HandleInfo, EmitStats, BQS) of - {ok, BQS1} -> reply(ok, S(BQS1)); - {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)} - end; - _ -> reply({error, pending_acks}, State) + case rabbit_mirror_queue_master:sync_mirrors(HandleInfo, EmitStats, BQS) of + {ok, BQS1} -> reply(ok, S(BQS1)); + {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)} end; handle_call(sync_mirrors, _From, State) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 9f12b34e..b63fccc9 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -230,7 +230,6 @@ handle_cast({sync_start, Ref, Syncer}, S = fun({TRefN, BQSN}) -> State1#state{depth_delta = undefined, rate_timer_ref = TRefN, backing_queue_state = BQSN} end, - %% [0] We can only sync when there are no pending acks case rabbit_mirror_queue_sync:slave( DD, Ref, TRef, Syncer, BQ, BQS, fun (BQN, BQSN) -> @@ -240,7 +239,7 @@ handle_cast({sync_start, Ref, Syncer}, {TRefN, BQSN1} end) of denied -> noreply(State1); - {ok, Res} -> noreply(set_delta(0, S(Res))); %% [0] + {ok, Res} -> noreply(set_delta(0, S(Res))); {failed, Res} -> noreply(S(Res)); {stop, Reason, Res} -> {stop, Reason, S(Res)} end; diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 4d6b1fc9..b023823e 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -91,16 +91,16 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) -> end. master_go0(Args, BQ, BQS) -> - case BQ:fold(fun (Msg, MsgProps, false, Acc) -> - master_send(Msg, MsgProps, Args, Acc) + case BQ:fold(fun (Msg, MsgProps, Unacked, Acc) -> + master_send(Msg, MsgProps, Unacked, Args, Acc) end, {0, erlang:now()}, BQS) of {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1}; {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1}; {_, BQS1} -> master_done(Args, BQS1) end. -master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, - {I, Last}) -> +master_send(Msg, MsgProps, Unacked, + {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) -> T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of true -> EmitStats({syncing, I}), Log("~p messages", [I]), @@ -119,7 +119,7 @@ master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}), gen_server2:reply(From, ok), {stop, cancelled}; - {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps}, + {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps, Unacked}, {cont, {I + 1, T}}; {'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}}; {'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}} @@ -164,11 +164,11 @@ syncer(Ref, Log, MPid, SPids) -> syncer_loop(Ref, MPid, SPids) -> MPid ! {next, Ref}, receive - {msg, Ref, Msg, MsgProps} -> + {msg, Ref, Msg, MsgProps, Unacked} -> SPids1 = wait_for_credit(SPids), [begin credit_flow:send(SPid), - SPid ! {sync_msg, Ref, Msg, MsgProps} + SPid ! {sync_msg, Ref, Msg, MsgProps, Unacked} end || SPid <- SPids1], syncer_loop(Ref, MPid, SPids1); {cancel, Ref} -> @@ -204,7 +204,7 @@ slave(0, Ref, _TRef, Syncer, _BQ, _BQS, _UpdateRamDuration) -> slave(_DD, Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) -> MRef = erlang:monitor(process, Syncer), Syncer ! {sync_ready, Ref, self()}, - {_MsgCount, BQS1} = BQ:purge(BQS), + {_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)), slave_sync_loop({Ref, MRef, Syncer, BQ, UpdateRamDuration, rabbit_misc:get_parent()}, TRef, BQS1). @@ -237,10 +237,16 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent}, update_ram_duration -> {TRef1, BQS1} = UpdateRamDuration(BQ, BQS), slave_sync_loop(Args, TRef1, BQS1); - {sync_msg, Ref, Msg, Props} -> + {sync_msg, Ref, Msg, Props, Unacked} -> credit_flow:ack(Syncer), Props1 = Props#message_properties{needs_confirming = false}, - BQS1 = BQ:publish(Msg, Props1, true, none, BQS), + BQS1 = case Unacked of + false -> BQ:publish(Msg, Props1, true, none, BQS); + true -> {_AckTag, BQS2} = BQ:publish_delivered( + Msg, Props1, none, BQS), + %% TODO do something w AckTag + BQS2 + end, slave_sync_loop(Args, TRef, BQS1); {'EXIT', Parent, Reason} -> {stop, Reason, {TRef, BQS}}; |