summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_master.erl
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-03-14 16:54:23 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-03-14 16:54:23 +0000
commitb6f5d1dcd1e9465f84a4374ecdece7a37f51504b (patch)
tree7a55815d85b837a317a9e6b528461708c315b4ec /src/rabbit_mirror_queue_master.erl
parent279e858cac439e493fe3990a4ef0ef689a0ff29b (diff)
parent80854415c2e1579d52127b9722c985c77d0791e4 (diff)
downloadrabbitmq-server-b6f5d1dcd1e9465f84a4374ecdece7a37f51504b.tar.gz
merge default into bug23554
Diffstat (limited to 'src/rabbit_mirror_queue_master.erl')
-rw-r--r--src/rabbit_mirror_queue_master.erl84
1 files changed, 47 insertions, 37 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 25a1e4b8..0ca73f03 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -16,10 +16,10 @@
-module(rabbit_mirror_queue_master).
--export([init/2, terminate/1, delete_and_terminate/1,
+-export([init/4, terminate/1, delete_and_terminate/1,
purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4,
- requeue/3, len/1, is_empty/1, dropwhile/2,
+ requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, validate_message/2]).
@@ -37,7 +37,8 @@
backing_queue,
backing_queue_state,
set_delivered,
- seen_status
+ seen_status,
+ confirmed
}).
%% ---------------------------------------------------------------------------
@@ -53,7 +54,8 @@ stop() ->
%% Same as start/1.
exit({not_valid_for_generic_backing_queue, ?MODULE}).
-init(#amqqueue { arguments = Args, name = QName } = Q, Recover) ->
+init(#amqqueue { arguments = Args, name = QName } = Q, Recover,
+ AsyncCallback, SyncCallback) ->
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, undefined),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
{_Type, Nodes} = rabbit_misc:table_lookup(Args, <<"x-mirror">>),
@@ -64,13 +66,14 @@ init(#amqqueue { arguments = Args, name = QName } = Q, Recover) ->
end,
[rabbit_mirror_queue_misc:add_slave(QName, Node) || Node <- Nodes1],
{ok, BQ} = application:get_env(backing_queue_module),
- BQS = BQ:init(Q, Recover),
+ BQS = BQ:init(Q, Recover, AsyncCallback, SyncCallback),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
set_delivered = 0,
- seen_status = dict:new() }.
+ seen_status = dict:new(),
+ confirmed = [] }.
promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) ->
#state { gm = GM,
@@ -78,7 +81,8 @@ promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) ->
backing_queue = BQ,
backing_queue_state = BQS,
set_delivered = BQ:len(BQS),
- seen_status = SeenStatus }.
+ seen_status = SeenStatus,
+ confirmed = [] }.
terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
@@ -138,6 +142,35 @@ dropwhile(Fun, State = #state { gm = GM,
State #state { backing_queue_state = BQS1,
set_delivered = SetDelivered1 }.
+drain_confirmed(State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen_status = SS,
+ confirmed = Confirmed }) ->
+ {MsgIds, BQS1} = BQ:drain_confirmed(BQS),
+ {MsgIds1, SS1} =
+ lists:foldl(
+ fun (MsgId, {MsgIdsN, SSN}) ->
+ case dict:find(MsgId, SSN) of
+ error ->
+ {[MsgId | MsgIdsN], SSN};
+ {ok, published} ->
+ %% It was published when we were a slave,
+ %% and we were promoted before we saw the
+ %% publish from the channel. We still
+ %% haven't seen the channel publish, and
+ %% consequently we need to filter out the
+ %% confirm here. We will issue the confirm
+ %% when we see the publish from the channel.
+ {MsgIdsN, dict:store(MsgId, confirmed, SSN)};
+ {ok, confirmed} ->
+ %% Well, confirms are racy by definition.
+ {[MsgId | MsgIdsN], SSN}
+ end
+ end, {[], SS}, MsgIds),
+ {Confirmed ++ MsgIds1, State #state { backing_queue_state = BQS1,
+ seen_status = SS1,
+ confirmed = [] }}.
+
fetch(AckRequired, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
@@ -236,38 +269,16 @@ status(#state { backing_queue = BQ, backing_queue_state = BQS}) ->
BQ:status(BQS).
invoke(?MODULE, Fun, State) ->
- Fun(State);
+ Fun(?MODULE, State);
invoke(Mod, Fun, State = #state { backing_queue = BQ,
- backing_queue_state = BQS,
- seen_status = SS }) ->
- {MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS),
- {MsgIds1, SS1} =
- lists:foldl(
- fun (MsgId, {MsgIdsN, SSN}) ->
- case dict:find(MsgId, SSN) of
- error ->
- {[MsgId | MsgIdsN], SSN};
- {ok, published} ->
- %% It was published when we were a slave,
- %% and we were promoted before we saw the
- %% publish from the channel. We still
- %% haven't seen the channel publish, and
- %% consequently we need to filter out the
- %% confirm here. We will issue the confirm
- %% when we see the publish from the channel.
- {MsgIdsN, dict:store(MsgId, confirmed, SSN)};
- {ok, confirmed} ->
- %% Well, confirms are racy by definition.
- {[MsgId | MsgIdsN], SSN}
- end
- end, {[], SS}, MsgIds),
- {MsgIds1, State #state { backing_queue_state = BQS1,
- seen_status = SS1 }}.
+ backing_queue_state = BQS }) ->
+ State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
validate_message(Message = #basic_message { id = MsgId },
State = #state { seen_status = SS,
backing_queue = BQ,
- backing_queue_state = BQS }) ->
+ backing_queue_state = BQS,
+ confirmed = Confirmed }) ->
%% Here, we need to deal with the possibility that we're about to
%% receive a message that we've already seen when we were a slave
%% (we received it via gm). Thus if we do receive such message now
@@ -299,7 +310,6 @@ validate_message(Message = #basic_message { id = MsgId },
%% need to confirm now. As above, amqqueue_process will
%% have the entry for the msg_id_to_channel mapping added
%% immediately prior to calling validate_message/2.
- ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
- self(), ?MODULE, fun (State1) -> {[MsgId], State1} end),
- {invalid, State #state { seen_status = dict:erase(MsgId, SS) }}
+ {invalid, State #state { seen_status = dict:erase(MsgId, SS),
+ confirmed = [MsgId | Confirmed] }}
end.