summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-10-03 09:20:39 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-10-03 09:20:39 +0100
commit55a89d13cb3f6d7d7cfe2abe8737c8b0296eed1e (patch)
treef8dec7d2e631a096a3ca0eea87f36292575766e7
parent1acaaffbbb9b8749b180a6f0b10b74cf6c252cba (diff)
downloadrabbitmq-server-55a89d13cb3f6d7d7cfe2abe8737c8b0296eed1e.tar.gz
distinguish between deliveries to masters and slaves
...thus allowing us to set the 'delivered' flag correctly for messages that were in flight during a promotion
-rw-r--r--src/rabbit_amqqueue.erl42
-rw-r--r--src/rabbit_amqqueue_process.erl8
-rw-r--r--src/rabbit_mirror_queue_slave.erl5
3 files changed, 35 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 8fc103e4..7e857fc7 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -622,29 +622,43 @@ deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) ->
%% returned. It is therefore safe to use a fire-and-forget cast
%% here and return the QPids - the semantics is preserved. This
%% scales much better than the case below.
- QPids = qpids(Qs),
+ {MPids, SPids} = qpids(Qs),
+ QPids = MPids ++ SPids,
case Flow of
flow -> [credit_flow:send(QPid) || QPid <- QPids];
noflow -> ok
end,
- delegate:invoke_no_result(
- QPids, fun (QPid) ->
- gen_server2:cast(QPid, {deliver, Delivery, Flow})
- end),
+ MMsg = {deliver, Delivery, false, Flow},
+ SMsg = {deliver, Delivery, true, Flow},
+ delegate:invoke_no_result(MPids,
+ fun (QPid) -> gen_server2:cast(QPid, MMsg) end),
+ delegate:invoke_no_result(SPids,
+ fun (QPid) -> gen_server2:cast(QPid, SMsg) end),
{routed, QPids};
deliver(Qs, Delivery, _Flow) ->
- case delegate:invoke(
- qpids(Qs), fun (QPid) ->
- ok = gen_server2:call(QPid, {deliver, Delivery},
- infinity)
- end) of
- {[], _} -> {unroutable, []};
- {R , _} -> {routed, [QPid || {QPid, ok} <- R]}
+ {MPids, SPids} = qpids(Qs),
+ MMsg = {deliver, Delivery, false},
+ SMsg = {deliver, Delivery, true},
+ {MRouted, _} = delegate:invoke(
+ MPids, fun (QPid) ->
+ ok = gen_server2:call(QPid, MMsg, infinity)
+ end),
+ {SRouted, _} = delegate:invoke(
+ SPids, fun (QPid) ->
+ ok = gen_server2:call(QPid, SMsg, infinity)
+ end),
+ case MRouted ++ SRouted of
+ [] -> {unroutable, []};
+ R -> {routed, [QPid || {QPid, ok} <- R]}
end.
-qpids(Qs) -> lists:append([[QPid | SPids] ||
- #amqqueue{pid = QPid, slave_pids = SPids} <- Qs]).
+qpids(Qs) ->
+ {MPids, SPids} = lists:foldl(fun (#amqqueue{pid = QPid, slave_pids = SPids},
+ {MPidAcc, SPidAcc}) ->
+ {[QPid | MPidAcc], [SPids | SPidAcc]}
+ end, {[], []}, Qs),
+ {MPids, lists:append(SPids)}.
safe_delegate_call_ok(F, Pids) ->
{_, Bads} = delegate:invoke(Pids, fun (Pid) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index dfd0ab7e..a22e32b0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1030,10 +1030,10 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State) ->
reply(consumers(State), State);
-handle_call({deliver, Delivery}, From, State) ->
+handle_call({deliver, Delivery, Delivered}, From, State) ->
%% Synchronous, "mandatory" deliver mode.
gen_server2:reply(From, ok),
- noreply(deliver_or_enqueue(Delivery, false, State));
+ noreply(deliver_or_enqueue(Delivery, Delivered, State));
handle_call({notify_down, ChPid}, From, State) ->
%% we want to do this synchronously, so that auto_deleted queues
@@ -1193,7 +1193,7 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
State = #q{senders = Senders}) ->
%% Asynchronous, non-"mandatory" deliver mode.
Senders1 = case Flow of
@@ -1202,7 +1202,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
noflow -> Senders
end,
State1 = State#q{senders = Senders1},
- noreply(deliver_or_enqueue(Delivery, false, State1));
+ noreply(deliver_or_enqueue(Delivery, Delivered, State1));
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(subtract_acks(
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index c74470f6..e4ff79ee 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -166,7 +166,7 @@ init_it(Self, Node, QueueName) ->
add_slave(Q, New, MPids) -> rabbit_mirror_queue_misc:store_updated_slaves(
Q#amqqueue{slave_pids = MPids ++ [New]}).
-handle_call({deliver, Delivery}, From, State) ->
+handle_call({deliver, Delivery, true}, From, State) ->
%% Synchronous, "mandatory" deliver mode.
gen_server2:reply(From, ok),
noreply(maybe_enqueue_message(Delivery, State));
@@ -220,7 +220,8 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({gm, Instruction}, State) ->
handle_process_result(process_instruction(Instruction, State));
-handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow},
+ State) ->
%% Asynchronous, non-"mandatory", deliver mode.
case Flow of
flow -> credit_flow:ack(Sender);