summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-10-16 15:20:09 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-10-16 15:20:09 +0100
commit2a33ca9aaa644b10eb1b0efab74b50e9ce6cc24c (patch)
tree9677a24746c5e9d6c166c6143b2250eb9adaf950
parent830e94ba3243c2e9fd7c20dcc69c730c804bcfa7 (diff)
parent7d1aa1dde7bb937bbecdcca71870fc7c90a9caff (diff)
downloadrabbitmq-server-2a33ca9aaa644b10eb1b0efab74b50e9ce6cc24c.tar.gz
merge default into bug25202
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_mirror_queue_slave.erl193
2 files changed, 78 insertions, 127 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b8aad11a..7ce69582 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -546,13 +546,13 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
case attempt_delivery(Delivery, Props, State1) of
{true, State2} ->
State2;
- %% the next one is an optimisations
- %% TODO: optimise the Confirm =/= never case too
- {false, State2 = #q{ttl = 0, dlx = undefined,
- backing_queue = BQ, backing_queue_state = BQS}}
- when Confirm == never ->
+ %% The next one is an optimisation
+ {false, State2 = #q{ttl = 0, dlx = undefined}} ->
+ %% fake an 'eventual' confirm from BQ; noop if not needed
+ State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ confirm_messages([Message#basic_message.id], State2),
BQS1 = BQ:discard(Message, SenderPid, BQS),
- State2#q{backing_queue_state = BQS1};
+ State3#q{backing_queue_state = BQS1};
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 7aa9c31f..303569e0 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -64,7 +64,6 @@
-record(state, { q,
gm,
- master_pid,
backing_queue,
backing_queue_state,
sync_timer_ref,
@@ -87,7 +86,7 @@ set_maximum_since_use(QPid, Age) ->
info(QPid) -> gen_server2:call(QPid, info, infinity).
-init(#amqqueue { name = QueueName } = Q) ->
+init(Q = #amqqueue { name = QName }) ->
%% We join the GM group before we add ourselves to the amqqueue
%% record. As a result:
%% 1. We can receive msgs from GM that correspond to messages we will
@@ -100,23 +99,23 @@ init(#amqqueue { name = QueueName } = Q) ->
%% above.
%%
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
- {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
+ {ok, GM} = gm:start_link(QName, ?MODULE, [self()]),
receive {joined, GM} -> ok end,
Self = self(),
Node = node(),
case rabbit_misc:execute_mnesia_transaction(
- fun() -> init_it(Self, GM, Node, QueueName) end) of
- {new, MPid} ->
- erlang:monitor(process, MPid),
+ fun() -> init_it(Self, GM, Node, QName) end) of
+ {new, QPid} ->
+ erlang:monitor(process, QPid),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [Self]),
ok = rabbit_memory_monitor:register(
Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
- BQS = bq_init(BQ, Q, false),
- State = #state { q = Q,
+ Q1 = Q #amqqueue { pid = QPid },
+ BQS = bq_init(BQ, Q1, false),
+ State = #state { q = Q1,
gm = GM,
- master_pid = MPid,
backing_queue = BQ,
backing_queue_state = BQS,
rate_timer_ref = undefined,
@@ -145,9 +144,9 @@ init(#amqqueue { name = QueueName } = Q) ->
ignore
end.
-init_it(Self, GM, Node, QueueName) ->
+init_it(Self, GM, Node, QName) ->
[Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids }] =
- mnesia:read({rabbit_queue, QueueName}),
+ mnesia:read({rabbit_queue, QName}),
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of
[] -> add_slave(Q, Self, GM),
{new, QPid};
@@ -179,30 +178,29 @@ handle_call({deliver, Delivery, true}, From, State) ->
noreply(maybe_enqueue_message(Delivery, State));
handle_call({gm_deaths, Deaths}, From,
- State = #state { q = #amqqueue { name = QueueName },
- master_pid = MPid }) ->
- %% The GM has told us about deaths, which means we're not going to
- %% receive any more messages from GM
- case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
+ State = #state { q = Q = #amqqueue { name = QName, pid = MPid }}) ->
+ case rabbit_mirror_queue_misc:remove_from_queue(QName, Deaths) of
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
{ok, Pid, DeadPids} ->
- rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName,
+ Self = self(),
+ rabbit_mirror_queue_misc:report_deaths(Self, false, QName,
DeadPids),
- if node(Pid) =:= node(MPid) ->
+ case Pid of
+ MPid ->
%% master hasn't changed
gen_server2:reply(From, ok),
noreply(State);
- node(Pid) =:= node() ->
+ Self ->
%% we've become master
QueueState = promote_me(From, State),
{become, rabbit_amqqueue_process, QueueState, hibernate};
- true ->
- %% master has changed to not us.
+ _ ->
+ %% master has changed to not us
gen_server2:reply(From, ok),
erlang:monitor(process, Pid),
- noreply(State #state { master_pid = Pid })
+ noreply(State #state { q = Q #amqqueue { pid = Pid } })
end
end;
@@ -252,7 +250,7 @@ handle_info(timeout, State) ->
noreply(backing_queue_timeout(State));
handle_info({'DOWN', _MonitorRef, process, MPid, _Reason},
- State = #state { gm = GM, master_pid = MPid }) ->
+ State = #state { gm = GM, q = #amqqueue { pid = MPid } }) ->
ok = gm:broadcast(GM, process_death),
noreply(State);
@@ -375,7 +373,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _State) -> self();
i(name, #state { q = #amqqueue { name = Name } }) -> Name;
-i(master_pid, #state { master_pid = MPid }) -> MPid;
+i(master_pid, #state { q = #amqqueue { pid = MPid } }) -> MPid;
i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0;
i(Item, _State) -> throw({bad_argument, Item}).
@@ -394,14 +392,20 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
-needs_confirming(#delivery{ msg_seq_no = undefined }, _State) ->
- never;
-needs_confirming(#delivery { message = #basic_message {
- is_persistent = true } },
- #state { q = #amqqueue { durable = true } }) ->
- eventually;
-needs_confirming(_Delivery, _State) ->
- immediately.
+send_or_record_confirm(_, #delivery{ msg_seq_no = undefined }, MS, _State) ->
+ MS;
+send_or_record_confirm(published, #delivery { sender = ChPid,
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message {
+ id = MsgId,
+ is_persistent = true } },
+ MS, #state { q = #amqqueue { durable = true } }) ->
+ dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS);
+send_or_record_confirm(_Status, #delivery { sender = ChPid,
+ msg_seq_no = MsgSeqNo },
+ MS, _State) ->
+ ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
+ MS.
confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
{CMs, MS1} =
@@ -628,9 +632,8 @@ confirm_sender_death(Pid) ->
ok.
maybe_enqueue_message(
- Delivery = #delivery { message = #basic_message { id = MsgId },
- msg_seq_no = MsgSeqNo,
- sender = ChPid },
+ Delivery = #delivery { message = #basic_message { id = MsgId },
+ sender = ChPid },
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
@@ -640,28 +643,11 @@ maybe_enqueue_message(
MQ1 = queue:in(Delivery, MQ),
SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ),
State1 #state { sender_queues = SQ1 };
- {ok, confirmed} ->
- ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
- sender_queues = SQ1 };
- {ok, published} ->
- MS1 = case needs_confirming(Delivery, State1) of
- never -> dict:erase(MsgId, MS);
- eventually -> MMS = {published, ChPid, MsgSeqNo},
- dict:store(MsgId, MMS, MS);
- immediately -> ok = rabbit_misc:confirm_to_sender(
- ChPid, [MsgSeqNo]),
- dict:erase(MsgId, MS)
- end,
+ {ok, Status} ->
+ MS1 = send_or_record_confirm(
+ Status, Delivery, dict:erase(MsgId, MS), State1),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { msg_id_status = MS1,
- sender_queues = SQ1 };
- {ok, discarded} ->
- %% We've already heard from GM that the msg is to be
- %% discarded. We won't see this again.
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
sender_queues = SQ1 }
end.
@@ -679,39 +665,26 @@ remove_from_pending_ch(MsgId, ChPid, SQ) ->
dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ)
end.
-process_instruction(
- {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }},
- State = #state { sender_queues = SQ,
- backing_queue = BQ,
- backing_queue_state = BQS,
- msg_id_status = MS }) ->
-
- %% We really are going to do the publish right now, even though we
- %% may not have seen it directly from the channel. But we cannot
- %% issues confirms until the latter has happened. So we need to
- %% keep track of the MsgId and its confirmation status in the
- %% meantime.
+publish_or_discard(Status, ChPid, MsgId,
+ State = #state { sender_queues = SQ, msg_id_status = MS }) ->
+ %% We really are going to do the publish/discard right now, even
+ %% though we may not have seen it directly from the channel. But
+ %% we cannot issues confirms until the latter has happened. So we
+ %% need to keep track of the MsgId and its confirmation status in
+ %% the meantime.
State1 = ensure_monitoring(ChPid, State),
{MQ, PendingCh} = get_sender_queue(ChPid, SQ),
{MQ1, PendingCh1, MS1} =
case queue:out(MQ) of
{empty, _MQ2} ->
{MQ, sets:add_element(MsgId, PendingCh),
- dict:store(MsgId, published, MS)};
+ dict:store(MsgId, Status, MS)};
{{value, Delivery = #delivery {
- msg_seq_no = MsgSeqNo,
- message = #basic_message { id = MsgId } }}, MQ2} ->
+ message = #basic_message { id = MsgId } }}, MQ2} ->
{MQ2, PendingCh,
%% We received the msg from the channel first. Thus
%% we need to deal with confirms here.
- case needs_confirming(Delivery, State1) of
- never -> MS;
- eventually -> MMS = {published, ChPid, MsgSeqNo},
- dict:store(MsgId, MMS , MS);
- immediately -> ok = rabbit_misc:confirm_to_sender(
- ChPid, [MsgSeqNo]),
- MS
- end};
+ send_or_record_confirm(Status, Delivery, MS, State1)};
{{value, #delivery {}}, _MQ2} ->
%% The instruction was sent to us before we were
%% within the slave_pids within the #amqqueue{}
@@ -720,52 +693,30 @@ process_instruction(
%% expecting any confirms from us.
{MQ, PendingCh, MS}
end,
-
SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
- State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 },
-
- {ok,
- case Deliver of
- false ->
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
- State2 #state { backing_queue_state = BQS1 };
- {true, AckRequired} ->
- {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
- ChPid, BQS),
- maybe_store_ack(AckRequired, MsgId, AckTag,
- State2 #state { backing_queue_state = BQS1 })
- end};
+ State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.
+
+
+process_instruction({publish, false, ChPid, MsgProps,
+ Msg = #basic_message { id = MsgId }}, State) ->
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ publish_or_discard(published, ChPid, MsgId, State),
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ {ok, State1 #state { backing_queue_state = BQS1 }};
+process_instruction({publish, {true, AckRequired}, ChPid, MsgProps,
+ Msg = #basic_message { id = MsgId }}, State) ->
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ publish_or_discard(published, ChPid, MsgId, State),
+ {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
+ ChPid, BQS),
+ {ok, maybe_store_ack(AckRequired, MsgId, AckTag,
+ State1 #state { backing_queue_state = BQS1 })};
process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
- State = #state { sender_queues = SQ,
- backing_queue = BQ,
- backing_queue_state = BQS,
- msg_id_status = MS }) ->
- %% Many of the comments around the publish head above apply here
- %% too.
- State1 = ensure_monitoring(ChPid, State),
- {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
- {MQ1, PendingCh1, MS1} =
- case queue:out(MQ) of
- {empty, _MQ} ->
- {MQ, sets:add_element(MsgId, PendingCh),
- dict:store(MsgId, discarded, MS)};
- {{value, #delivery { message = #basic_message { id = MsgId } }},
- MQ2} ->
- %% We've already seen it from the channel, we're not
- %% going to see this again, so don't add it to MS
- {MQ2, PendingCh, MS};
- {{value, #delivery {}}, _MQ2} ->
- %% The instruction was sent to us before we were
- %% within the slave_pids within the #amqqueue{}
- %% record. We'll never receive the message directly
- %% from the channel.
- {MQ, PendingCh, MS}
- end,
- SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
+ State) ->
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ publish_or_discard(discarded, ChPid, MsgId, State),
BQS1 = BQ:discard(Msg, ChPid, BQS),
- {ok, State1 #state { sender_queues = SQ1,
- msg_id_status = MS1,
- backing_queue_state = BQS1 }};
+ {ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({drop, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->