diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-04-16 18:09:57 +0100 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-04-16 18:09:57 +0100 |
commit | b5852ad1eed9aa08f7c0f3cb589be8d6a9f0f38a (patch) | |
tree | 4604f52ebd7fdf3f839c507ded0640f97df35e91 /src | |
parent | c39984c49e51597b914dfe881026305d948490cb (diff) | |
download | rabbitmq-server-b5852ad1eed9aa08f7c0f3cb589be8d6a9f0f38a.tar.gz |
Checking for cycles when sending instead of when receiving.
This seems to be the best options and it should solve the HA problems.
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 58 |
1 files changed, 32 insertions, 26 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3caf728b..e4dd3206 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -731,15 +731,25 @@ dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) -> State) end. +dead_letter_publish(Msg, Reason, + State = #q{publish_seqno = MsgSeqNo, + dlx = DLX}) -> + Delivery = #delivery{message = #basic_message{exchange_name = XName}} = + rabbit_basic:delivery( + false, false, make_dead_letter_msg(DLX, Reason, Msg, State), + MsgSeqNo), + {ok, X} = rabbit_exchange:lookup(XName), + QueueNames = rabbit_exchange:route(X, Delivery), + {QueueNames1, Cycles} = detect_dead_letter_cycles(Delivery, QueueNames), + lists:foreach(fun log_cycle_once/1, Cycles), + QPids = rabbit_amqqueue:lookup(QueueNames1), + {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery), + DeliveredQPids. + dead_letter_msg_existing_dlx(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo, - unconfirmed = UC, - dlx = DLX}) -> - {ok, _, QPids} = - rabbit_basic:publish( - rabbit_basic:delivery( - false, false, make_dead_letter_msg(DLX, Reason, Msg, State), - MsgSeqNo)), + unconfirmed = UC}) -> + QPids = dead_letter_publish(Msg, Reason, State), State1 = State#q{queue_monitors = pmon:monitor_all( QPids, State#q.queue_monitors), publish_seqno = MsgSeqNo + 1}, @@ -797,28 +807,30 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, false -> noreply(State1) end. -already_been_here(_Delivery, #q{dlx = undefined}) -> - false; -already_been_here(#delivery{message = #basic_message{content = Content}}, - State) -> +detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}}, + QueueNames) -> #content{properties = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), - #resource{name = QueueName} = qname(State), + NoCycles = {QueueNames, []}, case Headers of undefined -> - false; + NoCycles; _ -> case rabbit_misc:table_lookup(Headers, <<"x-death">>) of {array, DeathTables} -> OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) || {table, D} <- DeathTables], OldQueues1 = [QName || {longstr, QName} <- OldQueues], - case lists:member(QueueName, OldQueues1) of - true -> [QueueName | OldQueues1]; - _ -> false - end; + OldQueuesSet = ordsets:from_list(OldQueues1), + {Cycling, NotCycling} = + lists:partition( + fun(QueueName) -> + ordsets:is_element(QueueName, OldQueuesSet) + end, QueueNames), + {NotCycling, + lists:map(fun (Q) -> [Q | OldQueues1] end, Cycling)}; _ -> - false + QueueNames end end. @@ -1196,8 +1208,7 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({deliver, Delivery = #delivery{sender = Sender, - msg_seq_no = MsgSeqNo}, Flow}, +handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State = #q{senders = Senders}) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. Senders1 = case Flow of @@ -1206,12 +1217,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender, noflow -> Senders end, State1 = State#q{senders = Senders1}, - case already_been_here(Delivery, State1) of - false -> noreply(deliver_or_enqueue(Delivery, State1)); - Qs -> log_cycle_once(Qs), - rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]), - noreply(State1) - end; + noreply(deliver_or_enqueue(Delivery, State1)); handle_cast({ack, AckTags, ChPid}, State) -> noreply(subtract_acks( |