diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-05-20 06:25:28 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-05-20 06:25:28 +0100 |
commit | 5560bd99752526f0c8ab6f5fe145e052d7ab98be (patch) | |
tree | 12e90e652f8e1198f1cfb72bbdf7f1f1712b7233 | |
parent | 244bd0da5e65d1e811930ec9566d6c20318aa4c7 (diff) | |
download | rabbitmq-server-5560bd99752526f0c8ab6f5fe145e052d7ab98be.tar.gz |
rationalise publish/route result types
-rw-r--r-- | include/rabbit.hrl | 1 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 32 | ||||
-rw-r--r-- | src/rabbit_error_logger.erl | 2 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 34 | ||||
-rw-r--r-- | src/rabbit_router.erl | 25 |
5 files changed, 47 insertions, 47 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index c707112f..ffda0698 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -143,6 +143,7 @@ host :: string() | atom(), port :: non_neg_integer()}). -type(not_found() :: {'error', 'not_found'}). +-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). -endif. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5954a1d4..8ec25ad5 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -323,24 +323,24 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, content = DecodedContent, persistent_key = PersistentKey}, - Handled = - case rabbit_exchange:publish(Exchange, Mandatory, Immediate, TxnKey, - Message) of - {ok, DeliveredQPids} -> DeliveredQPids; - {error, unroutable, DeliveredQPids} -> - %% FIXME: 312 should be replaced by the ?NO_ROUTE - %% definition, when we move to >=0-9 - ok = basic_return(Message, WriterPid, 312, <<"unroutable">>), - DeliveredQPids; - {error, not_delivered, DeliveredQPids} -> - %% FIXME: 313 should be replaced by the ?NO_CONSUMERS - %% definition, when we move to >=0-9 - ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>), - DeliveredQPids - end, + {RoutingRes, DeliveredQPids} = + rabbit_exchange:publish(Exchange, Mandatory, Immediate, TxnKey, + Message), + case RoutingRes of + routed -> + ok; + unroutable -> + %% FIXME: 312 should be replaced by the ?NO_ROUTE + %% definition, when we move to >=0-9 + ok = basic_return(Message, WriterPid, 312, <<"unroutable">>); + not_delivered -> + %% FIXME: 313 should be replaced by the ?NO_CONSUMERS + %% definition, when we move to >=0-9 + ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>) + end, {noreply, case TxnKey of none -> State; - _ -> add_tx_participants(Handled, State) + _ -> add_tx_participants(DeliveredQPids, State) end}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index dc5824f1..bc14fdb8 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -74,7 +74,7 @@ publish(_Other, _Format, _Data, _State) -> ok. publish1(RoutingKey, Format, Data, LogExch) -> - {ok, _QueueNames} = rabbit_exchange:simple_publish( + {ok, _RoutingRes} = rabbit_exchange:simple_publish( false, false, LogExch, RoutingKey, <<"text/plain">>, list_to_binary(io_lib:format(Format, Data))), ok. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 2fccd0ce..f644d710 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -36,8 +36,7 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info/1, info/2, info_all/1, info_all/2, - publish/5, simple_publish/6, simple_publish/3, - route/3]). + publish/5, simple_publish/6, simple_publish/3, route/3]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). @@ -57,8 +56,6 @@ -ifdef(use_specs). --type(publish_res() :: {'ok', [pid()]} | - {'error', 'not_found' | 'unroutable' | 'not_delivered', [pid()]}). -type(bind_res() :: 'ok' | {'error', 'queue_not_found' | 'exchange_not_found' | @@ -76,11 +73,12 @@ -spec(info_all/1 :: (vhost()) -> [[info()]]). -spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). -spec(publish/5 :: (exchange(), bool(), bool(), maybe(txn()), message()) -> - publish_res()). + {routing_result(), [pid()]}). -spec(simple_publish/6 :: (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> - publish_res()). --spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). + {ok, routing_result()} | not_found()). +-spec(simple_publish/3 :: (bool(), bool(), message()) -> + {ok, routing_result()} | not_found()). -spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]). -spec(add_binding/4 :: (exchange_name(), queue_name(), routing_key(), amqp_table()) -> @@ -202,13 +200,11 @@ publish(X, Mandatory, Immediate, Txn, Message = #basic_message{routing_key = RK, content = C}) -> case rabbit_router:deliver(route(X, RK, C), Mandatory, Immediate, Txn, Message) of - R = {ok, [_|_]} -> - R; - {ok, []} -> - {ok, _} = handle_unrouted(X, Txn, Message); - {error, Error} -> - {ok, DeliveredQPids} = handle_unrouted(X, Txn, Message), - {error, Error, DeliveredQPids} + {RoutingRes, []} -> + {routed, DeliveredQPids} = handle_unrouted(X, Txn, Message), + {RoutingRes, DeliveredQPids}; + Other -> + Other end. handle_unrouted(#exchange{name = XName, arguments = Args}, Txn, Message) -> @@ -222,10 +218,10 @@ handle_unrouted(#exchange{name = XName, arguments = Args}, Txn, Message) -> rabbit_log:warning( "unroutable message exchange for ~s does not exist: ~s", [rabbit_misc:rs(XName), rabbit_misc:rs(UmeName)]), - {ok, []} + {routed, []} end; false -> - {ok, []} + {routed, []} end. %% Usable by Erlang code that wants to publish messages. @@ -246,8 +242,10 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, simple_publish(Mandatory, Immediate, Message = #basic_message{exchange_name = ExchangeName}) -> case lookup(ExchangeName) of - {ok, X} -> publish(X, Mandatory, Immediate, none, Message); - {error, Error} -> {error, Error, []} + {ok, X} -> {RoutingRes, _} = publish(X, Mandatory, Immediate, none, + Message), + {ok, RoutingRes}; + Other -> Other end. sort_arguments(Arguments) -> diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 0b06a063..57166428 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -51,7 +51,7 @@ -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(deliver/5 :: ([pid()], bool(), bool(), maybe(txn()), message()) -> - {'ok', [pid()]} | {'error', 'unroutable' | 'not_delivered'}). + {routing_result(), [pid()]}). -endif. @@ -98,14 +98,15 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false, %% therefore safe to use a fire-and-forget cast here and return %% the QPids - the semantics is preserved. This scales much better %% than the non-immediate case below. - {ok, lists:flatmap( - fun ({Node, QPids}) -> - gen_server2:cast( - {?SERVER, Node}, - {deliver, QPids, Mandatory, Immediate, Txn, Message}), - QPids - end, - NodeQPids)}; + {routed, + lists:flatmap( + fun ({Node, QPids}) -> + gen_server2:cast( + {?SERVER, Node}, + {deliver, QPids, Mandatory, Immediate, Txn, Message}), + QPids + end, + NodeQPids)}; deliver_per_node(NodeQPids, Mandatory, Immediate, Txn, Message) -> R = rabbit_misc:upmap( @@ -179,6 +180,6 @@ run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) -> QPids). %% check_delivery(Mandatory, Immediate, {WasRouted, QPids}) -check_delivery(true, _ , {false, []}) -> {error, unroutable}; -check_delivery(_ , true, {_ , []}) -> {error, not_delivered}; -check_delivery(_ , _ , {_ , Qs}) -> {ok, Qs}. +check_delivery(true, _ , {false, []}) -> {unroutable, []}; +check_delivery(_ , true, {_ , []}) -> {not_delivered, []}; +check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}. |