diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-05-20 17:46:44 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-05-20 17:46:44 +0100 |
commit | 6087e31872aa8a03283ceef3bc8e03774bf54200 (patch) | |
tree | a528df952d1e0fb99dd16c0bbcc830bdee6d9988 | |
parent | 8e01c9b20223d9ae91212bd6fe9e067153d0c124 (diff) | |
download | rabbitmq-server-6087e31872aa8a03283ceef3bc8e03774bf54200.tar.gz |
Fold publish and republish together.
-rw-r--r-- | src/rabbit_basic.erl | 81 | ||||
-rw-r--r-- | src/rabbit_trace.erl | 4 |
2 files changed, 40 insertions, 45 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 355e390e..91bdf826 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -19,7 +19,7 @@ -include("rabbit_framing.hrl"). -export([publish/1, message/3, message/4, properties/1, delivery/5]). --export([publish/4, publish/7, republish/4, republish/7]). +-export([publish/4, publish/7]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -48,21 +48,16 @@ -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). -spec(publish/4 :: - (rabbit_exchange:name(), rabbit_router:routing_key(), - properties_input(), binary()) -> publish_result()). + (rabbit_types:exchange() | rabbit_exchange:name(), + rabbit_router:routing_key(), properties_input(), + binary() | [binary()]) -> publish_result()). -spec(publish/7 :: - (rabbit_exchange:name(), rabbit_router:routing_key(), - boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), - properties_input(), binary()) -> publish_result()). --spec(republish/4 :: - (rabbit_types:exchange(), rabbit_router:routing_key(), - properties_input(), [binary()]) -> publish_result()). --spec(republish/7 :: - (rabbit_types:exchange(), rabbit_router:routing_key(), - boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), - properties_input(), [binary()]) -> publish_result()). --spec(build_content/2 :: (rabbit_framing:amqp_property_record(), binary()) -> - rabbit_types:content()). + (rabbit_types:exchange() | rabbit_exchange:name(), + rabbit_router:routing_key(), boolean(), boolean(), + rabbit_types:maybe(rabbit_types:txn()), properties_input(), + binary() | [binary()]) -> publish_result()). +-spec(build_content/2 :: (rabbit_framing:amqp_property_record(), + binary() | [binary()]) -> rabbit_types:content()). -spec(from_content/1 :: (rabbit_types:content()) -> {rabbit_framing:amqp_property_record(), binary()}). @@ -73,13 +68,14 @@ publish(Delivery = #delivery{ message = #basic_message{exchange_name = ExchangeName}}) -> case rabbit_exchange:lookup(ExchangeName) of - {ok, X} -> - {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery), - {ok, RoutingRes, DeliveredQPids}; - Other -> - Other + {ok, X} -> publish(X, Delivery); + Other -> Other end. +publish(X, Delivery) -> + {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery), + {ok, RoutingRes, DeliveredQPids}. + delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) -> #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, sender = self(), message = Message, msg_seq_no = MsgSeqNo}. @@ -136,9 +132,9 @@ message(ExchangeName, RoutingKey, {error, _Reason} = Error -> Error end. -message(ExchangeName, RoutingKey, RawProperties, BodyBin) -> +message(ExchangeName, RoutingKey, RawProperties, Body) -> Properties = properties(RawProperties), - Content = build_content(Properties, BodyBin), + Content = build_content(Properties, Body), {ok, Msg} = message(ExchangeName, RoutingKey, Content), Msg. @@ -163,32 +159,31 @@ indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1). %% Convenience function, for avoiding round-trips in calls across the %% erlang distributed network. -publish(ExchangeName, RoutingKeyBin, Properties, BodyBin) -> - publish(ExchangeName, RoutingKeyBin, false, false, none, Properties, +publish(Exchange, RoutingKeyBin, Properties, BodyBin) -> + publish(Exchange, RoutingKeyBin, false, false, none, Properties, BodyBin). %% Convenience function, for avoiding round-trips in calls across the %% erlang distributed network. -publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties, +publish(Exchange, RoutingKeyBin, Mandatory, Immediate, Txn, Properties, BodyBin) -> - publish(delivery(Mandatory, Immediate, Txn, - message(ExchangeName, RoutingKeyBin, - properties(Properties), BodyBin), - undefined)). - -%% It's faster if you already have an exchange and a message not to -%% look up the exchange and disassemble and reassemble fragments -republish(X, RoutingKey, Props, PFR) -> - republish(X, RoutingKey, false, false, none, Props, PFR). - -%% It's faster if you already have an exchange and a message not to -%% look up the exchange and disassemble and reassemble fragments -republish(X = #exchange{name = XName}, - RoutingKey, Mandatory, Immediate, Txn, Props, PFR) -> - {ok, Msg} = message(XName, RoutingKey, build_content(Props, PFR)), - Delivery = delivery(Mandatory, Immediate, Txn, Msg, undefined), - {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery), - {ok, RoutingRes, DeliveredQPids}. + case exchange(Exchange) of + X = #exchange{} -> + publish(delivery(Mandatory, Immediate, Txn, + message(X#exchange.name, RoutingKeyBin, + properties(Properties), BodyBin), + undefined)); + _ -> + {ok, unroutable, []} + end. + +exchange(X = #exchange{}) -> + X; +exchange(N = #resource{kind = exchange}) -> + case rabbit_exchange:lookup(N) of + {ok, X} -> X; + Err -> Err + end. is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index e0681f15..cf8ee64c 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -27,7 +27,7 @@ -ifdef(use_specs). --type(state() :: rabbit_exchange:name() | 'none'). +-type(state() :: rabbit_types:exchange() | 'none'). -spec(init/1 :: (rabbit_types:vhost()) -> state()). -spec(tap_trace_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok'). @@ -86,7 +86,7 @@ maybe_trace(#exchange{name = Name}, #basic_message{exchange_name = Name}, maybe_trace(X, Msg = #basic_message{content = #content{ payload_fragments_rev = PFR}}, RKPrefix, RKSuffix, Extra) -> - {ok, _, _} = rabbit_basic:republish( + {ok, _, _} = rabbit_basic:publish( X, <<RKPrefix/binary, ".", RKSuffix/binary>>, #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR), ok. |