summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-05-20 06:25:28 +0100
committerMatthias Radestock <matthias@lshift.net>2009-05-20 06:25:28 +0100
commit5560bd99752526f0c8ab6f5fe145e052d7ab98be (patch)
tree12e90e652f8e1198f1cfb72bbdf7f1f1712b7233
parent244bd0da5e65d1e811930ec9566d6c20318aa4c7 (diff)
downloadrabbitmq-server-5560bd99752526f0c8ab6f5fe145e052d7ab98be.tar.gz
rationalise publish/route result types
-rw-r--r--include/rabbit.hrl1
-rw-r--r--src/rabbit_channel.erl32
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_exchange.erl34
-rw-r--r--src/rabbit_router.erl25
5 files changed, 47 insertions, 47 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index c707112f..ffda0698 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -143,6 +143,7 @@
host :: string() | atom(),
port :: non_neg_integer()}).
-type(not_found() :: {'error', 'not_found'}).
+-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
-endif.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5954a1d4..8ec25ad5 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -323,24 +323,24 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
content = DecodedContent,
persistent_key = PersistentKey},
- Handled =
- case rabbit_exchange:publish(Exchange, Mandatory, Immediate, TxnKey,
- Message) of
- {ok, DeliveredQPids} -> DeliveredQPids;
- {error, unroutable, DeliveredQPids} ->
- %% FIXME: 312 should be replaced by the ?NO_ROUTE
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 312, <<"unroutable">>),
- DeliveredQPids;
- {error, not_delivered, DeliveredQPids} ->
- %% FIXME: 313 should be replaced by the ?NO_CONSUMERS
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>),
- DeliveredQPids
- end,
+ {RoutingRes, DeliveredQPids} =
+ rabbit_exchange:publish(Exchange, Mandatory, Immediate, TxnKey,
+ Message),
+ case RoutingRes of
+ routed ->
+ ok;
+ unroutable ->
+ %% FIXME: 312 should be replaced by the ?NO_ROUTE
+ %% definition, when we move to >=0-9
+ ok = basic_return(Message, WriterPid, 312, <<"unroutable">>);
+ not_delivered ->
+ %% FIXME: 313 should be replaced by the ?NO_CONSUMERS
+ %% definition, when we move to >=0-9
+ ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>)
+ end,
{noreply, case TxnKey of
none -> State;
- _ -> add_tx_participants(Handled, State)
+ _ -> add_tx_participants(DeliveredQPids, State)
end};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index dc5824f1..bc14fdb8 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -74,7 +74,7 @@ publish(_Other, _Format, _Data, _State) ->
ok.
publish1(RoutingKey, Format, Data, LogExch) ->
- {ok, _QueueNames} = rabbit_exchange:simple_publish(
+ {ok, _RoutingRes} = rabbit_exchange:simple_publish(
false, false, LogExch, RoutingKey, <<"text/plain">>,
list_to_binary(io_lib:format(Format, Data))),
ok.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 2fccd0ce..f644d710 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -36,8 +36,7 @@
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
list/1, info/1, info/2, info_all/1, info_all/2,
- publish/5, simple_publish/6, simple_publish/3,
- route/3]).
+ publish/5, simple_publish/6, simple_publish/3, route/3]).
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
@@ -57,8 +56,6 @@
-ifdef(use_specs).
--type(publish_res() :: {'ok', [pid()]} |
- {'error', 'not_found' | 'unroutable' | 'not_delivered', [pid()]}).
-type(bind_res() :: 'ok' | {'error',
'queue_not_found' |
'exchange_not_found' |
@@ -76,11 +73,12 @@
-spec(info_all/1 :: (vhost()) -> [[info()]]).
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
-spec(publish/5 :: (exchange(), bool(), bool(), maybe(txn()), message()) ->
- publish_res()).
+ {routing_result(), [pid()]}).
-spec(simple_publish/6 ::
(bool(), bool(), exchange_name(), routing_key(), binary(), binary()) ->
- publish_res()).
--spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()).
+ {ok, routing_result()} | not_found()).
+-spec(simple_publish/3 :: (bool(), bool(), message()) ->
+ {ok, routing_result()} | not_found()).
-spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]).
-spec(add_binding/4 ::
(exchange_name(), queue_name(), routing_key(), amqp_table()) ->
@@ -202,13 +200,11 @@ publish(X, Mandatory, Immediate, Txn,
Message = #basic_message{routing_key = RK, content = C}) ->
case rabbit_router:deliver(route(X, RK, C),
Mandatory, Immediate, Txn, Message) of
- R = {ok, [_|_]} ->
- R;
- {ok, []} ->
- {ok, _} = handle_unrouted(X, Txn, Message);
- {error, Error} ->
- {ok, DeliveredQPids} = handle_unrouted(X, Txn, Message),
- {error, Error, DeliveredQPids}
+ {RoutingRes, []} ->
+ {routed, DeliveredQPids} = handle_unrouted(X, Txn, Message),
+ {RoutingRes, DeliveredQPids};
+ Other ->
+ Other
end.
handle_unrouted(#exchange{name = XName, arguments = Args}, Txn, Message) ->
@@ -222,10 +218,10 @@ handle_unrouted(#exchange{name = XName, arguments = Args}, Txn, Message) ->
rabbit_log:warning(
"unroutable message exchange for ~s does not exist: ~s",
[rabbit_misc:rs(XName), rabbit_misc:rs(UmeName)]),
- {ok, []}
+ {routed, []}
end;
false ->
- {ok, []}
+ {routed, []}
end.
%% Usable by Erlang code that wants to publish messages.
@@ -246,8 +242,10 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin,
simple_publish(Mandatory, Immediate,
Message = #basic_message{exchange_name = ExchangeName}) ->
case lookup(ExchangeName) of
- {ok, X} -> publish(X, Mandatory, Immediate, none, Message);
- {error, Error} -> {error, Error, []}
+ {ok, X} -> {RoutingRes, _} = publish(X, Mandatory, Immediate, none,
+ Message),
+ {ok, RoutingRes};
+ Other -> Other
end.
sort_arguments(Arguments) ->
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 0b06a063..57166428 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -51,7 +51,7 @@
-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(deliver/5 :: ([pid()], bool(), bool(), maybe(txn()), message()) ->
- {'ok', [pid()]} | {'error', 'unroutable' | 'not_delivered'}).
+ {routing_result(), [pid()]}).
-endif.
@@ -98,14 +98,15 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false,
%% therefore safe to use a fire-and-forget cast here and return
%% the QPids - the semantics is preserved. This scales much better
%% than the non-immediate case below.
- {ok, lists:flatmap(
- fun ({Node, QPids}) ->
- gen_server2:cast(
- {?SERVER, Node},
- {deliver, QPids, Mandatory, Immediate, Txn, Message}),
- QPids
- end,
- NodeQPids)};
+ {routed,
+ lists:flatmap(
+ fun ({Node, QPids}) ->
+ gen_server2:cast(
+ {?SERVER, Node},
+ {deliver, QPids, Mandatory, Immediate, Txn, Message}),
+ QPids
+ end,
+ NodeQPids)};
deliver_per_node(NodeQPids, Mandatory, Immediate,
Txn, Message) ->
R = rabbit_misc:upmap(
@@ -179,6 +180,6 @@ run_bindings(QPids, IsMandatory, IsImmediate, Txn, Message) ->
QPids).
%% check_delivery(Mandatory, Immediate, {WasRouted, QPids})
-check_delivery(true, _ , {false, []}) -> {error, unroutable};
-check_delivery(_ , true, {_ , []}) -> {error, not_delivered};
-check_delivery(_ , _ , {_ , Qs}) -> {ok, Qs}.
+check_delivery(true, _ , {false, []}) -> {unroutable, []};
+check_delivery(_ , true, {_ , []}) -> {not_delivered, []};
+check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}.