diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-05-19 21:28:24 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-05-19 21:28:24 +0100 |
commit | 244bd0da5e65d1e811930ec9566d6c20318aa4c7 (patch) | |
tree | 3aac9d0bba9ecea4769c373f0a4a6ce19dac5f2c | |
parent | 98d90838d5810cf99f63224bf92e715c18da4659 (diff) | |
download | rabbitmq-server-244bd0da5e65d1e811930ec9566d6c20318aa4c7.tar.gz |
move publication logic, incl ume handling, into rabbit_exchange
...and make rabbit_exchange:simple_publish use it
-rw-r--r-- | src/rabbit_channel.erl | 67 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 47 |
2 files changed, 56 insertions, 58 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4a3cb285..5954a1d4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -319,12 +319,25 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, true -> rabbit_guid:guid(); false -> none end, - Handled = publish(Exchange, Mandatory, Immediate, TxnKey, - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = DecodedContent, - persistent_key = PersistentKey}, - WriterPid), + Message = #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey, + content = DecodedContent, + persistent_key = PersistentKey}, + Handled = + case rabbit_exchange:publish(Exchange, Mandatory, Immediate, TxnKey, + Message) of + {ok, DeliveredQPids} -> DeliveredQPids; + {error, unroutable, DeliveredQPids} -> + %% FIXME: 312 should be replaced by the ?NO_ROUTE + %% definition, when we move to >=0-9 + ok = basic_return(Message, WriterPid, 312, <<"unroutable">>), + DeliveredQPids; + {error, not_delivered, DeliveredQPids} -> + %% FIXME: 313 should be replaced by the ?NO_CONSUMERS + %% definition, when we move to >=0-9 + ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>), + DeliveredQPids + end, {noreply, case TxnKey of none -> State; _ -> add_tx_participants(Handled, State) @@ -773,48 +786,6 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, ok -> return_ok(State, NoWait, ReturnMethod) end. -publish(X, Mandatory, Immediate, Txn, - Message = #basic_message{routing_key = RK, content = C}, WriterPid) -> - case deliver(rabbit_exchange:route(X, RK, C), Mandatory, Immediate, - Txn, Message, WriterPid) of - [] -> - case lists:keysearch(<<"ume">>, 1, X#exchange.arguments) of - {value, {_, longstr, UmeNameBin}} -> - XName = X#exchange.name, - UmeName = rabbit_misc:r(XName, exchange, UmeNameBin), - case rabbit_exchange:lookup(UmeName) of - {ok, Ume} -> - publish(Ume, false, false, Txn, Message, WriterPid); - {error, not_found} -> - rabbit_log:warning( - "unroutable message exchange for ~s " - "does not exist: ~s", - [rabbit_misc:rs(XName), - rabbit_misc:rs(UmeName)]), - [] - end; - false -> - [] - end; - Handled -> - Handled - end. - -deliver(QPids, Mandatory, Immediate, Txn, Message, WriterPid) -> - case rabbit_router:deliver(QPids, Mandatory, Immediate, Txn, Message) of - {ok, DeliveredQPids} -> DeliveredQPids; - {error, unroutable} -> - %% FIXME: 312 should be replaced by the ?NO_ROUTE - %% definition, when we move to >=0-9 - ok = basic_return(Message, WriterPid, 312, <<"unroutable">>), - []; - {error, not_delivered} -> - %% FIXME: 313 should be replaced by the ?NO_CONSUMERS - %% definition, when we move to >=0-9 - ok = basic_return(Message, WriterPid, 313, <<"not_delivered">>), - [] - end. - basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 3b6338c7..2fccd0ce 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -36,7 +36,7 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info/1, info/2, info_all/1, info_all/2, - simple_publish/6, simple_publish/3, + publish/5, simple_publish/6, simple_publish/3, route/3]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). @@ -58,7 +58,7 @@ -ifdef(use_specs). -type(publish_res() :: {'ok', [pid()]} | - not_found() | {'error', 'unroutable' | 'not_delivered'}). + {'error', 'not_found' | 'unroutable' | 'not_delivered', [pid()]}). -type(bind_res() :: 'ok' | {'error', 'queue_not_found' | 'exchange_not_found' | @@ -75,6 +75,8 @@ -spec(info/2 :: (exchange(), [info_key()]) -> [info()]). -spec(info_all/1 :: (vhost()) -> [[info()]]). -spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]). +-spec(publish/5 :: (exchange(), bool(), bool(), maybe(txn()), message()) -> + publish_res()). -spec(simple_publish/6 :: (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> publish_res()). @@ -196,6 +198,36 @@ info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). +publish(X, Mandatory, Immediate, Txn, + Message = #basic_message{routing_key = RK, content = C}) -> + case rabbit_router:deliver(route(X, RK, C), + Mandatory, Immediate, Txn, Message) of + R = {ok, [_|_]} -> + R; + {ok, []} -> + {ok, _} = handle_unrouted(X, Txn, Message); + {error, Error} -> + {ok, DeliveredQPids} = handle_unrouted(X, Txn, Message), + {error, Error, DeliveredQPids} + end. + +handle_unrouted(#exchange{name = XName, arguments = Args}, Txn, Message) -> + case lists:keysearch(<<"ume">>, 1, Args) of + {value, {_, longstr, UmeNameBin}} -> + UmeName = rabbit_misc:r(XName, exchange, UmeNameBin), + case lookup(UmeName) of + {ok, Ume} -> + publish(Ume, false, false, Txn, Message); + {error, not_found} -> + rabbit_log:warning( + "unroutable message exchange for ~s does not exist: ~s", + [rabbit_misc:rs(XName), rabbit_misc:rs(UmeName)]), + {ok, []} + end; + false -> + {ok, []} + end. + %% Usable by Erlang code that wants to publish messages. simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> @@ -212,15 +244,10 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, %% Usable by Erlang code that wants to publish messages. simple_publish(Mandatory, Immediate, - Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = Content}) -> + Message = #basic_message{exchange_name = ExchangeName}) -> case lookup(ExchangeName) of - {ok, Exchange} -> - QPids = route(Exchange, RoutingKey, Content), - rabbit_router:deliver(QPids, Mandatory, Immediate, - none, Message); - {error, Error} -> {error, Error} + {ok, X} -> publish(X, Mandatory, Immediate, none, Message); + {error, Error} -> {error, Error, []} end. sort_arguments(Arguments) -> |