summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-10-15 16:13:50 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-10-15 16:13:50 +0100
commit58c1346b1ddede2ea88c0b0e1623c145faa0f319 (patch)
treee47c7800f78950a64e31adb706471049a0e2d9c4
parent93129443584f3e310613e1d752be2cea0a10a013 (diff)
downloadrabbitmq-server-58c1346b1ddede2ea88c0b0e1623c145faa0f319.tar.gz
refactor: extract commonality between 'publish' and 'discard'
-rw-r--r--src/rabbit_mirror_queue_slave.erl103
1 files changed, 30 insertions, 73 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index b1b27c7e..aea3b54e 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -387,13 +387,13 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
-needs_confirming(#delivery{ msg_seq_no = undefined }, _State) ->
+needs_confirming(_, #delivery{ msg_seq_no = undefined }, _State) ->
never;
-needs_confirming(#delivery { message = #basic_message {
- is_persistent = true } },
+needs_confirming(published, #delivery { message = #basic_message {
+ is_persistent = true } },
#state { q = #amqqueue { durable = true } }) ->
eventually;
-needs_confirming(_Delivery, _State) ->
+needs_confirming(_Status, _Delivery, _State) ->
immediately.
confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
@@ -638,10 +638,10 @@ maybe_enqueue_message(
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { msg_id_status = dict:erase(MsgId, MS),
sender_queues = SQ1 };
- {ok, published} ->
- MS1 = case needs_confirming(Delivery, State1) of
+ {ok, Status} when Status =:= published orelse Status =:= discarded ->
+ MS1 = case needs_confirming(Status, Delivery, State1) of
never -> dict:erase(MsgId, MS);
- eventually -> MMS = {published, ChPid, MsgSeqNo},
+ eventually -> MMS = {Status, ChPid, MsgSeqNo},
dict:store(MsgId, MMS, MS);
immediately -> ok = rabbit_misc:confirm_to_sender(
ChPid, [MsgSeqNo]),
@@ -649,16 +649,6 @@ maybe_enqueue_message(
end,
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { msg_id_status = MS1,
- sender_queues = SQ1 };
- {ok, discarded} ->
- %% We've already heard from GM that the msg is to be
- %% discarded. We won't see this again.
- case needs_confirming(Delivery, State1) of
- never -> ok;
- _ -> ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo])
- end,
- SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
- State1 #state { msg_id_status = dict:erase(MsgId, MS),
sender_queues = SQ1 }
end.
@@ -676,34 +666,30 @@ remove_from_pending_ch(MsgId, ChPid, SQ) ->
dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ)
end.
-process_instruction(
- {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }},
- State = #state { sender_queues = SQ,
- backing_queue = BQ,
- backing_queue_state = BQS,
- msg_id_status = MS }) ->
-
- %% We really are going to do the publish right now, even though we
- %% may not have seen it directly from the channel. But we cannot
- %% issues confirms until the latter has happened. So we need to
- %% keep track of the MsgId and its confirmation status in the
- %% meantime.
+publish_or_discard(Status, ChPid, MsgId,
+ State = #state { sender_queues = SQ,
+ msg_id_status = MS }) ->
+ %% We really are going to do the publish/discard right now, even
+ %% though we may not have seen it directly from the channel. But
+ %% we cannot issues confirms until the latter has happened. So we
+ %% need to keep track of the MsgId and its confirmation status in
+ %% the meantime.
State1 = ensure_monitoring(ChPid, State),
{MQ, PendingCh} = get_sender_queue(ChPid, SQ),
{MQ1, PendingCh1, MS1} =
case queue:out(MQ) of
{empty, _MQ2} ->
{MQ, sets:add_element(MsgId, PendingCh),
- dict:store(MsgId, published, MS)};
+ dict:store(MsgId, Status, MS)};
{{value, Delivery = #delivery {
msg_seq_no = MsgSeqNo,
message = #basic_message { id = MsgId } }}, MQ2} ->
{MQ2, PendingCh,
%% We received the msg from the channel first. Thus
%% we need to deal with confirms here.
- case needs_confirming(Delivery, State1) of
+ case needs_confirming(Status, Delivery, State1) of
never -> MS;
- eventually -> MMS = {published, ChPid, MsgSeqNo},
+ eventually -> MMS = {Status, ChPid, MsgSeqNo},
dict:store(MsgId, MMS , MS);
immediately -> ok = rabbit_misc:confirm_to_sender(
ChPid, [MsgSeqNo]),
@@ -717,60 +703,31 @@ process_instruction(
%% expecting any confirms from us.
{MQ, PendingCh, MS}
end,
-
SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
- State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 },
+ State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.
+
+process_instruction({publish, Deliver, ChPid, MsgProps,
+ Msg = #basic_message { id = MsgId }}, State) ->
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ publish_or_discard(published, ChPid, MsgId, State),
{ok,
case Deliver of
false ->
BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
- State2 #state { backing_queue_state = BQS1 };
+ State1 #state { backing_queue_state = BQS1 };
{true, AckRequired} ->
{AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
ChPid, BQS),
maybe_store_ack(AckRequired, MsgId, AckTag,
- State2 #state { backing_queue_state = BQS1 })
+ State1 #state { backing_queue_state = BQS1 })
end};
process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
- State = #state { sender_queues = SQ,
- backing_queue = BQ,
- backing_queue_state = BQS,
- msg_id_status = MS }) ->
- %% Many of the comments around the publish head above apply here
- %% too.
- State1 = ensure_monitoring(ChPid, State),
- {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
- {MQ1, PendingCh1, MS1} =
- case queue:out(MQ) of
- {empty, _MQ} ->
- {MQ, sets:add_element(MsgId, PendingCh),
- dict:store(MsgId, discarded, MS)};
- {{value, Delivery = #delivery {
- msg_seq_no = MsgSeqNo,
- message = #basic_message { id = MsgId } }}, MQ2} ->
- %% We received the msg from the channel first. Thus
- %% we need to deal with confirms here.
- case needs_confirming(Delivery, State1) of
- never -> ok;
- _ -> ok = rabbit_misc:confirm_to_sender(
- ChPid, [MsgSeqNo])
- end,
- %% We've already seen it from the channel, we're not
- %% going to see this again, so don't add it to MS
- {MQ2, PendingCh, MS};
- {{value, #delivery {}}, _MQ2} ->
- %% The instruction was sent to us before we were
- %% within the slave_pids within the #amqqueue{}
- %% record. We'll never receive the message directly
- %% from the channel.
- {MQ, PendingCh, MS}
- end,
- SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
+ State) ->
+ State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
+ publish_or_discard(discarded, ChPid, MsgId, State),
BQS1 = BQ:discard(Msg, ChPid, BQS),
- {ok, State1 #state { sender_queues = SQ1,
- msg_id_status = MS1,
- backing_queue_state = BQS1 }};
+ {ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({drop, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->