diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2012-10-03 09:20:39 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-10-03 09:20:39 +0100 |
commit | 55a89d13cb3f6d7d7cfe2abe8737c8b0296eed1e (patch) | |
tree | f8dec7d2e631a096a3ca0eea87f36292575766e7 | |
parent | 1acaaffbbb9b8749b180a6f0b10b74cf6c252cba (diff) | |
download | rabbitmq-server-55a89d13cb3f6d7d7cfe2abe8737c8b0296eed1e.tar.gz |
distinguish between deliveries to masters and slaves
...thus allowing us to set the 'delivered' flag correctly for messages
that were in flight during a promotion
-rw-r--r-- | src/rabbit_amqqueue.erl | 42 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 8 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 5 |
3 files changed, 35 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 8fc103e4..7e857fc7 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -622,29 +622,43 @@ deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) -> %% returned. It is therefore safe to use a fire-and-forget cast %% here and return the QPids - the semantics is preserved. This %% scales much better than the case below. - QPids = qpids(Qs), + {MPids, SPids} = qpids(Qs), + QPids = MPids ++ SPids, case Flow of flow -> [credit_flow:send(QPid) || QPid <- QPids]; noflow -> ok end, - delegate:invoke_no_result( - QPids, fun (QPid) -> - gen_server2:cast(QPid, {deliver, Delivery, Flow}) - end), + MMsg = {deliver, Delivery, false, Flow}, + SMsg = {deliver, Delivery, true, Flow}, + delegate:invoke_no_result(MPids, + fun (QPid) -> gen_server2:cast(QPid, MMsg) end), + delegate:invoke_no_result(SPids, + fun (QPid) -> gen_server2:cast(QPid, SMsg) end), {routed, QPids}; deliver(Qs, Delivery, _Flow) -> - case delegate:invoke( - qpids(Qs), fun (QPid) -> - ok = gen_server2:call(QPid, {deliver, Delivery}, - infinity) - end) of - {[], _} -> {unroutable, []}; - {R , _} -> {routed, [QPid || {QPid, ok} <- R]} + {MPids, SPids} = qpids(Qs), + MMsg = {deliver, Delivery, false}, + SMsg = {deliver, Delivery, true}, + {MRouted, _} = delegate:invoke( + MPids, fun (QPid) -> + ok = gen_server2:call(QPid, MMsg, infinity) + end), + {SRouted, _} = delegate:invoke( + SPids, fun (QPid) -> + ok = gen_server2:call(QPid, SMsg, infinity) + end), + case MRouted ++ SRouted of + [] -> {unroutable, []}; + R -> {routed, [QPid || {QPid, ok} <- R]} end. -qpids(Qs) -> lists:append([[QPid | SPids] || - #amqqueue{pid = QPid, slave_pids = SPids} <- Qs]). +qpids(Qs) -> + {MPids, SPids} = lists:foldl(fun (#amqqueue{pid = QPid, slave_pids = SPids}, + {MPidAcc, SPidAcc}) -> + {[QPid | MPidAcc], [SPids | SPidAcc]} + end, {[], []}, Qs), + {MPids, lists:append(SPids)}. safe_delegate_call_ok(F, Pids) -> {_, Bads} = delegate:invoke(Pids, fun (Pid) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index dfd0ab7e..a22e32b0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1030,10 +1030,10 @@ handle_call({info, Items}, _From, State) -> handle_call(consumers, _From, State) -> reply(consumers(State), State); -handle_call({deliver, Delivery}, From, State) -> +handle_call({deliver, Delivery, Delivered}, From, State) -> %% Synchronous, "mandatory" deliver mode. gen_server2:reply(From, ok), - noreply(deliver_or_enqueue(Delivery, false, State)); + noreply(deliver_or_enqueue(Delivery, Delivered, State)); handle_call({notify_down, ChPid}, From, State) -> %% we want to do this synchronously, so that auto_deleted queues @@ -1193,7 +1193,7 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, +handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, State = #q{senders = Senders}) -> %% Asynchronous, non-"mandatory" deliver mode. Senders1 = case Flow of @@ -1202,7 +1202,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, noflow -> Senders end, State1 = State#q{senders = Senders1}, - noreply(deliver_or_enqueue(Delivery, false, State1)); + noreply(deliver_or_enqueue(Delivery, Delivered, State1)); handle_cast({ack, AckTags, ChPid}, State) -> noreply(subtract_acks( diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index c74470f6..e4ff79ee 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -166,7 +166,7 @@ init_it(Self, Node, QueueName) -> add_slave(Q, New, MPids) -> rabbit_mirror_queue_misc:store_updated_slaves( Q#amqqueue{slave_pids = MPids ++ [New]}). -handle_call({deliver, Delivery}, From, State) -> +handle_call({deliver, Delivery, true}, From, State) -> %% Synchronous, "mandatory" deliver mode. gen_server2:reply(From, ok), noreply(maybe_enqueue_message(Delivery, State)); @@ -220,7 +220,8 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({gm, Instruction}, State) -> handle_process_result(process_instruction(Instruction, State)); -handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow}, + State) -> %% Asynchronous, non-"mandatory", deliver mode. case Flow of flow -> credit_flow:ack(Sender); |