summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-19 20:16:22 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-19 20:16:22 +0000
commit005788d47882dade23b7c3b605bcafde4107222d (patch)
tree830904edb41914798109f44e65afbdff315d8f6b
parentdb3848bb726db4019c8e595e72109e3daa81ab7e (diff)
downloadrabbitmq-server-005788d47882dade23b7c3b605bcafde4107222d.tar.gz
eager sync of messages pending ack
-rw-r--r--docs/rabbitmqctl.1.xml3
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_mirror_queue_slave.erl3
-rw-r--r--src/rabbit_mirror_queue_sync.erl26
5 files changed, 23 insertions, 24 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index c7069aed..bbd2fe5b 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -465,8 +465,7 @@
synchronise itself. The queue will block while
synchronisation takes place (all publishers to and
consumers from the queue will block). The queue must be
- mirrored, and must not have any pending unacknowledged
- messages for this command to succeed.
+ mirrored for this command to succeed.
</para>
<para>
Note that unsynchronised queues from which messages are
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2477b891..21b6bb92 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -174,8 +174,7 @@
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-spec(start_mirroring/1 :: (pid()) -> 'ok').
-spec(stop_mirroring/1 :: (pid()) -> 'ok').
--spec(sync_mirrors/1 :: (pid()) ->
- 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')).
+-spec(sync_mirrors/1 :: (pid()) -> 'ok' | rabbit_types:error('not_mirrored')).
-spec(cancel_sync_mirrors/1 :: (pid()) -> 'ok' | {'ok', 'not_syncing'}).
-endif.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0a07a005..2795e317 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1163,7 +1163,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(requeue(AckTags, ChPid, State));
handle_call(sync_mirrors, _From,
- State = #q{backing_queue = rabbit_mirror_queue_master = BQ,
+ State = #q{backing_queue = rabbit_mirror_queue_master,
backing_queue_state = BQS}) ->
S = fun(BQSN) -> State#q{backing_queue_state = BQSN} end,
HandleInfo = fun (Status) ->
@@ -1179,13 +1179,9 @@ handle_call(sync_mirrors, _From,
State, #q.stats_timer,
fun() -> emit_stats(State#q{status = Status}) end)
end,
- case BQ:depth(BQS) - BQ:len(BQS) of
- 0 -> case rabbit_mirror_queue_master:sync_mirrors(
- HandleInfo, EmitStats, BQS) of
- {ok, BQS1} -> reply(ok, S(BQS1));
- {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)}
- end;
- _ -> reply({error, pending_acks}, State)
+ case rabbit_mirror_queue_master:sync_mirrors(HandleInfo, EmitStats, BQS) of
+ {ok, BQS1} -> reply(ok, S(BQS1));
+ {stop, Reason, BQS1} -> {stop, Reason, S(BQS1)}
end;
handle_call(sync_mirrors, _From, State) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 9f12b34e..b63fccc9 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -230,7 +230,6 @@ handle_cast({sync_start, Ref, Syncer},
S = fun({TRefN, BQSN}) -> State1#state{depth_delta = undefined,
rate_timer_ref = TRefN,
backing_queue_state = BQSN} end,
- %% [0] We can only sync when there are no pending acks
case rabbit_mirror_queue_sync:slave(
DD, Ref, TRef, Syncer, BQ, BQS,
fun (BQN, BQSN) ->
@@ -240,7 +239,7 @@ handle_cast({sync_start, Ref, Syncer},
{TRefN, BQSN1}
end) of
denied -> noreply(State1);
- {ok, Res} -> noreply(set_delta(0, S(Res))); %% [0]
+ {ok, Res} -> noreply(set_delta(0, S(Res)));
{failed, Res} -> noreply(S(Res));
{stop, Reason, Res} -> {stop, Reason, S(Res)}
end;
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 4d6b1fc9..b023823e 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -91,16 +91,16 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) ->
end.
master_go0(Args, BQ, BQS) ->
- case BQ:fold(fun (Msg, MsgProps, false, Acc) ->
- master_send(Msg, MsgProps, Args, Acc)
+ case BQ:fold(fun (Msg, MsgProps, Unacked, Acc) ->
+ master_send(Msg, MsgProps, Unacked, Args, Acc)
end, {0, erlang:now()}, BQS) of
{{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1};
{{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1};
{_, BQS1} -> master_done(Args, BQS1)
end.
-master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent},
- {I, Last}) ->
+master_send(Msg, MsgProps, Unacked,
+ {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) ->
T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of
true -> EmitStats({syncing, I}),
Log("~p messages", [I]),
@@ -119,7 +119,7 @@ master_send(Msg, MsgProps, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent},
cancel_sync_mirrors} -> stop_syncer(Syncer, {cancel, Ref}),
gen_server2:reply(From, ok),
{stop, cancelled};
- {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps},
+ {next, Ref} -> Syncer ! {msg, Ref, Msg, MsgProps, Unacked},
{cont, {I + 1, T}};
{'EXIT', Parent, Reason} -> {stop, {shutdown, Reason}};
{'EXIT', Syncer, Reason} -> {stop, {sync_died, Reason}}
@@ -164,11 +164,11 @@ syncer(Ref, Log, MPid, SPids) ->
syncer_loop(Ref, MPid, SPids) ->
MPid ! {next, Ref},
receive
- {msg, Ref, Msg, MsgProps} ->
+ {msg, Ref, Msg, MsgProps, Unacked} ->
SPids1 = wait_for_credit(SPids),
[begin
credit_flow:send(SPid),
- SPid ! {sync_msg, Ref, Msg, MsgProps}
+ SPid ! {sync_msg, Ref, Msg, MsgProps, Unacked}
end || SPid <- SPids1],
syncer_loop(Ref, MPid, SPids1);
{cancel, Ref} ->
@@ -204,7 +204,7 @@ slave(0, Ref, _TRef, Syncer, _BQ, _BQS, _UpdateRamDuration) ->
slave(_DD, Ref, TRef, Syncer, BQ, BQS, UpdateRamDuration) ->
MRef = erlang:monitor(process, Syncer),
Syncer ! {sync_ready, Ref, self()},
- {_MsgCount, BQS1} = BQ:purge(BQS),
+ {_MsgCount, BQS1} = BQ:purge(BQ:purge_acks(BQS)),
slave_sync_loop({Ref, MRef, Syncer, BQ, UpdateRamDuration,
rabbit_misc:get_parent()}, TRef, BQS1).
@@ -237,10 +237,16 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
update_ram_duration ->
{TRef1, BQS1} = UpdateRamDuration(BQ, BQS),
slave_sync_loop(Args, TRef1, BQS1);
- {sync_msg, Ref, Msg, Props} ->
+ {sync_msg, Ref, Msg, Props, Unacked} ->
credit_flow:ack(Syncer),
Props1 = Props#message_properties{needs_confirming = false},
- BQS1 = BQ:publish(Msg, Props1, true, none, BQS),
+ BQS1 = case Unacked of
+ false -> BQ:publish(Msg, Props1, true, none, BQS);
+ true -> {_AckTag, BQS2} = BQ:publish_delivered(
+ Msg, Props1, none, BQS),
+ %% TODO do something w AckTag
+ BQS2
+ end,
slave_sync_loop(Args, TRef, BQS1);
{'EXIT', Parent, Reason} ->
{stop, Reason, {TRef, BQS}};