summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-04-16 18:09:57 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-04-16 18:09:57 +0100
commitb5852ad1eed9aa08f7c0f3cb589be8d6a9f0f38a (patch)
tree4604f52ebd7fdf3f839c507ded0640f97df35e91 /src
parentc39984c49e51597b914dfe881026305d948490cb (diff)
downloadrabbitmq-server-b5852ad1eed9aa08f7c0f3cb589be8d6a9f0f38a.tar.gz
Checking for cycles when sending instead of when receiving.
This seems to be the best options and it should solve the HA problems.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl58
1 files changed, 32 insertions, 26 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3caf728b..e4dd3206 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -731,15 +731,25 @@ dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) ->
State)
end.
+dead_letter_publish(Msg, Reason,
+ State = #q{publish_seqno = MsgSeqNo,
+ dlx = DLX}) ->
+ Delivery = #delivery{message = #basic_message{exchange_name = XName}} =
+ rabbit_basic:delivery(
+ false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
+ MsgSeqNo),
+ {ok, X} = rabbit_exchange:lookup(XName),
+ QueueNames = rabbit_exchange:route(X, Delivery),
+ {QueueNames1, Cycles} = detect_dead_letter_cycles(Delivery, QueueNames),
+ lists:foreach(fun log_cycle_once/1, Cycles),
+ QPids = rabbit_amqqueue:lookup(QueueNames1),
+ {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
+ DeliveredQPids.
+
dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
State = #q{publish_seqno = MsgSeqNo,
- unconfirmed = UC,
- dlx = DLX}) ->
- {ok, _, QPids} =
- rabbit_basic:publish(
- rabbit_basic:delivery(
- false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
- MsgSeqNo)),
+ unconfirmed = UC}) ->
+ QPids = dead_letter_publish(Msg, Reason, State),
State1 = State#q{queue_monitors = pmon:monitor_all(
QPids, State#q.queue_monitors),
publish_seqno = MsgSeqNo + 1},
@@ -797,28 +807,30 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
false -> noreply(State1)
end.
-already_been_here(_Delivery, #q{dlx = undefined}) ->
- false;
-already_been_here(#delivery{message = #basic_message{content = Content}},
- State) ->
+detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}},
+ QueueNames) ->
#content{properties = #'P_basic'{headers = Headers}} =
rabbit_binary_parser:ensure_content_decoded(Content),
- #resource{name = QueueName} = qname(State),
+ NoCycles = {QueueNames, []},
case Headers of
undefined ->
- false;
+ NoCycles;
_ ->
case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
{array, DeathTables} ->
OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
{table, D} <- DeathTables],
OldQueues1 = [QName || {longstr, QName} <- OldQueues],
- case lists:member(QueueName, OldQueues1) of
- true -> [QueueName | OldQueues1];
- _ -> false
- end;
+ OldQueuesSet = ordsets:from_list(OldQueues1),
+ {Cycling, NotCycling} =
+ lists:partition(
+ fun(QueueName) ->
+ ordsets:is_element(QueueName, OldQueuesSet)
+ end, QueueNames),
+ {NotCycling,
+ lists:map(fun (Q) -> [Q | OldQueues1] end, Cycling)};
_ ->
- false
+ QueueNames
end
end.
@@ -1196,8 +1208,7 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({deliver, Delivery = #delivery{sender = Sender,
- msg_seq_no = MsgSeqNo}, Flow},
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
State = #q{senders = Senders}) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
Senders1 = case Flow of
@@ -1206,12 +1217,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender,
noflow -> Senders
end,
State1 = State#q{senders = Senders1},
- case already_been_here(Delivery, State1) of
- false -> noreply(deliver_or_enqueue(Delivery, State1));
- Qs -> log_cycle_once(Qs),
- rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]),
- noreply(State1)
- end;
+ noreply(deliver_or_enqueue(Delivery, State1));
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(subtract_acks(