diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-22 12:22:46 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-22 12:22:46 +0000 |
commit | 0fdf9424c18fc944480d1d470b929830379e5d36 (patch) | |
tree | 5f2d5a9b025fbae218e4a337706c476b91bbe318 | |
parent | e9cdeca755062af54b1034f7b1e371f25225ab86 (diff) | |
download | rabbitmq-server-0fdf9424c18fc944480d1d470b929830379e5d36.tar.gz |
Fix basic.return stats, and fix various problems found by dialyzer.
-rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 3 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 18 | ||||
-rw-r--r-- | src/rabbit_dead_letter.erl | 2 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 2 |
5 files changed, 10 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0fd94539..c81c7688 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -51,7 +51,7 @@ -ifdef(use_specs). --export_type([name/0, qmsg/0, routing_result/0]). +-export_type([name/0, qmsg/0]). -type(name() :: rabbit_types:r('queue')). -type(qpids() :: [pid()]). @@ -61,7 +61,6 @@ -type(msg_id() :: non_neg_integer()). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). --type(routing_result() :: 'routed' | 'unroutable'). -type(queue_or_absent() :: rabbit_types:amqqueue() | {'absent', rabbit_types:amqqueue()}). -type(not_found_or_absent() :: 'not_found' | @@ -138,9 +137,9 @@ -spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()). -spec(forget_all_durable/1 :: (node()) -> 'ok'). -spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> - {routing_result(), qpids()}). + qpids()). -spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> - {routing_result(), qpids()}). + qpids()). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 454755e6..927489da 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -31,8 +31,7 @@ -type(properties_input() :: (rabbit_framing:amqp_property_record() | [{atom(), any()}])). -type(publish_result() :: - ({ok, rabbit_amqqueue:routing_result(), [pid()]} - | rabbit_types:error('not_found'))). + ({ok, [pid()]} | rabbit_types:error('not_found'))). -type(headers() :: rabbit_framing:amqp_table() | 'undefined'). -type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ce0adb2b..e24b8d4a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -350,10 +350,8 @@ handle_cast(force_event_refresh, State) -> %% TODO duplication? handle_cast({mandatory_received, MsgSeqNo}, State) -> - State1 = #ch{mandatory = M} = handle_mandatory(MsgSeqNo, State), - Timeout = case M of [] -> hibernate; _ -> 0 end, %% NB: don't call noreply/1 since we don't want to send confirms. - {noreply, ensure_stats_timer(State1), Timeout}; + {noreply, ensure_stats_timer(handle_mandatory(MsgSeqNo, State)), hibernate}; handle_cast({confirm, MsgSeqNos, From}, State) -> State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), @@ -1354,7 +1352,9 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, basic_return(#basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}, - #ch{protocol = Protocol, writer_pid = WriterPid}, Reason) -> + State = #ch{protocol = Protocol, writer_pid = WriterPid}, + Reason) -> + ?INCR_STATS([{exchange_stats, ExchangeName, 1}], return_unroutable, State), {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason), ok = rabbit_writer:send_command( WriterPid, @@ -1557,8 +1557,6 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ publish, State1), State1. -%% TODO unbreak basic.return stats - process_routing_mandatory(_, false, _MsgSeqNo, _Msg, State) -> State; process_routing_mandatory([], true, _MsgSeqNo, Msg, State) -> @@ -1576,14 +1574,6 @@ process_routing_confirm(QPids, true, MsgSeqNo, XName, State) -> State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, State#ch.unconfirmed)}. -%% process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> -%% ok = basic_return(Msg, State, no_route), -%% ?INCR_STATS([{exchange_stats, XName, 1}], return_unroutable, State), -%% case MsgSeqNo of -%% undefined -> State; -%% _ -> record_confirms([{MsgSeqNo, XName}], State) -%% end. - send_nacks([], State) -> State; send_nacks(_MXs, State = #ch{state = closing, diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index 640b282e..b8a2cc9c 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -34,7 +34,7 @@ publish(Msg, Reason, X, RK, QName) -> DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName), - Delivery = rabbit_basic:delivery(false, DLMsg, undefined), + Delivery = rabbit_basic:delivery(false, false, DLMsg, undefined), {Queues, Cycles} = detect_cycles(Reason, DLMsg, rabbit_exchange:route(X, Delivery)), lists:foreach(fun log_cycle_once/1, Cycles), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 054db8ae..dfb5794c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2409,7 +2409,7 @@ publish_and_confirm(Q, Payload, Count) -> Payload), Delivery = #delivery{mandatory = false, sender = self(), message = Msg, msg_seq_no = Seq}, - {routed, _} = rabbit_amqqueue:deliver([Q], Delivery) + _QPids = rabbit_amqqueue:deliver([Q], Delivery) end || Seq <- Seqs], wait_for_confirms(gb_sets:from_list(Seqs)). |