summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-05-19 21:28:24 +0100
committerMatthias Radestock <matthias@lshift.net>2009-05-19 21:28:24 +0100
commit244bd0da5e65d1e811930ec9566d6c20318aa4c7 (patch)
tree3aac9d0bba9ecea4769c373f0a4a6ce19dac5f2c
parent98d90838d5810cf99f63224bf92e715c18da4659 (diff)
downloadrabbitmq-server-244bd0da5e65d1e811930ec9566d6c20318aa4c7.tar.gz
move publication logic, incl ume handling, into rabbit_exchange
...and make rabbit_exchange:simple_publish use it
-rw-r--r--src/rabbit_channel.erl67
-rw-r--r--src/rabbit_exchange.erl47
2 files changed, 56 insertions, 58 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 4a3cb285..5954a1d4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -319,12 +319,25 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
true -> rabbit_guid:guid();
false -> none
end,
- Handled = publish(Exchange, Mandatory, Immediate, TxnKey,
- #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = DecodedContent,
- persistent_key = PersistentKey},
- WriterPid),
+ Message = #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKey,
+ content = DecodedContent,
+ persistent_key = PersistentKey},
+ Handled =
+ case rabbit_exchange:publish(Exchange, Mandatory, Immediate, TxnKey,
+ Message) of
+ {ok, DeliveredQPids} -> DeliveredQPids;
+ {error, unroutable, DeliveredQPids} ->
+ %% FIXME: 312 should be replaced by the ?NO_ROUTE
+ %% definition, when we move to >=0-9
+ ok = basic_return(Message, WriterPid, 312, <<"unroutable">>),
+ DeliveredQPids;
+ {error, not_delivered, DeliveredQPids} ->
+ %% FIXME: 313 should be replaced by the ?NO_CONSUMERS
+ %% definition, when we move to >=0-9
+ ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>),
+ DeliveredQPids
+ end,
{noreply, case TxnKey of
none -> State;
_ -> add_tx_participants(Handled, State)
@@ -773,48 +786,6 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ok -> return_ok(State, NoWait, ReturnMethod)
end.
-publish(X, Mandatory, Immediate, Txn,
- Message = #basic_message{routing_key = RK, content = C}, WriterPid) ->
- case deliver(rabbit_exchange:route(X, RK, C), Mandatory, Immediate,
- Txn, Message, WriterPid) of
- [] ->
- case lists:keysearch(<<"ume">>, 1, X#exchange.arguments) of
- {value, {_, longstr, UmeNameBin}} ->
- XName = X#exchange.name,
- UmeName = rabbit_misc:r(XName, exchange, UmeNameBin),
- case rabbit_exchange:lookup(UmeName) of
- {ok, Ume} ->
- publish(Ume, false, false, Txn, Message, WriterPid);
- {error, not_found} ->
- rabbit_log:warning(
- "unroutable message exchange for ~s "
- "does not exist: ~s",
- [rabbit_misc:rs(XName),
- rabbit_misc:rs(UmeName)]),
- []
- end;
- false ->
- []
- end;
- Handled ->
- Handled
- end.
-
-deliver(QPids, Mandatory, Immediate, Txn, Message, WriterPid) ->
- case rabbit_router:deliver(QPids, Mandatory, Immediate, Txn, Message) of
- {ok, DeliveredQPids} -> DeliveredQPids;
- {error, unroutable} ->
- %% FIXME: 312 should be replaced by the ?NO_ROUTE
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 312, <<"unroutable">>),
- [];
- {error, not_delivered} ->
- %% FIXME: 313 should be replaced by the ?NO_CONSUMERS
- %% definition, when we move to >=0-9
- ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>),
- []
- end.
-
basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = Content},
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 3b6338c7..2fccd0ce 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -36,7 +36,7 @@
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
list/1, info/1, info/2, info_all/1, info_all/2,
- simple_publish/6, simple_publish/3,
+ publish/5, simple_publish/6, simple_publish/3,
route/3]).
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
@@ -58,7 +58,7 @@
-ifdef(use_specs).
-type(publish_res() :: {'ok', [pid()]} |
- not_found() | {'error', 'unroutable' | 'not_delivered'}).
+ {'error', 'not_found' | 'unroutable' | 'not_delivered', [pid()]}).
-type(bind_res() :: 'ok' | {'error',
'queue_not_found' |
'exchange_not_found' |
@@ -75,6 +75,8 @@
-spec(info/2 :: (exchange(), [info_key()]) -> [info()]).
-spec(info_all/1 :: (vhost()) -> [[info()]]).
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
+-spec(publish/5 :: (exchange(), bool(), bool(), maybe(txn()), message()) ->
+ publish_res()).
-spec(simple_publish/6 ::
(bool(), bool(), exchange_name(), routing_key(), binary(), binary()) ->
publish_res()).
@@ -196,6 +198,36 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
+publish(X, Mandatory, Immediate, Txn,
+ Message = #basic_message{routing_key = RK, content = C}) ->
+ case rabbit_router:deliver(route(X, RK, C),
+ Mandatory, Immediate, Txn, Message) of
+ R = {ok, [_|_]} ->
+ R;
+ {ok, []} ->
+ {ok, _} = handle_unrouted(X, Txn, Message);
+ {error, Error} ->
+ {ok, DeliveredQPids} = handle_unrouted(X, Txn, Message),
+ {error, Error, DeliveredQPids}
+ end.
+
+handle_unrouted(#exchange{name = XName, arguments = Args}, Txn, Message) ->
+ case lists:keysearch(<<"ume">>, 1, Args) of
+ {value, {_, longstr, UmeNameBin}} ->
+ UmeName = rabbit_misc:r(XName, exchange, UmeNameBin),
+ case lookup(UmeName) of
+ {ok, Ume} ->
+ publish(Ume, false, false, Txn, Message);
+ {error, not_found} ->
+ rabbit_log:warning(
+ "unroutable message exchange for ~s does not exist: ~s",
+ [rabbit_misc:rs(XName), rabbit_misc:rs(UmeName)]),
+ {ok, []}
+ end;
+ false ->
+ {ok, []}
+ end.
+
%% Usable by Erlang code that wants to publish messages.
simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin,
ContentTypeBin, BodyBin) ->
@@ -212,15 +244,10 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin,
%% Usable by Erlang code that wants to publish messages.
simple_publish(Mandatory, Immediate,
- Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = Content}) ->
+ Message = #basic_message{exchange_name = ExchangeName}) ->
case lookup(ExchangeName) of
- {ok, Exchange} ->
- QPids = route(Exchange, RoutingKey, Content),
- rabbit_router:deliver(QPids, Mandatory, Immediate,
- none, Message);
- {error, Error} -> {error, Error}
+ {ok, X} -> publish(X, Mandatory, Immediate, none, Message);
+ {error, Error} -> {error, Error, []}
end.
sort_arguments(Arguments) ->