summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-05-20 17:46:44 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-05-20 17:46:44 +0100
commit6087e31872aa8a03283ceef3bc8e03774bf54200 (patch)
treea528df952d1e0fb99dd16c0bbcc830bdee6d9988
parent8e01c9b20223d9ae91212bd6fe9e067153d0c124 (diff)
downloadrabbitmq-server-6087e31872aa8a03283ceef3bc8e03774bf54200.tar.gz
Fold publish and republish together.
-rw-r--r--src/rabbit_basic.erl81
-rw-r--r--src/rabbit_trace.erl4
2 files changed, 40 insertions, 45 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 355e390e..91bdf826 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -19,7 +19,7 @@
-include("rabbit_framing.hrl").
-export([publish/1, message/3, message/4, properties/1, delivery/5]).
--export([publish/4, publish/7, republish/4, republish/7]).
+-export([publish/4, publish/7]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -48,21 +48,16 @@
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
-spec(publish/4 ::
- (rabbit_exchange:name(), rabbit_router:routing_key(),
- properties_input(), binary()) -> publish_result()).
+ (rabbit_types:exchange() | rabbit_exchange:name(),
+ rabbit_router:routing_key(), properties_input(),
+ binary() | [binary()]) -> publish_result()).
-spec(publish/7 ::
- (rabbit_exchange:name(), rabbit_router:routing_key(),
- boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
- properties_input(), binary()) -> publish_result()).
--spec(republish/4 ::
- (rabbit_types:exchange(), rabbit_router:routing_key(),
- properties_input(), [binary()]) -> publish_result()).
--spec(republish/7 ::
- (rabbit_types:exchange(), rabbit_router:routing_key(),
- boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
- properties_input(), [binary()]) -> publish_result()).
--spec(build_content/2 :: (rabbit_framing:amqp_property_record(), binary()) ->
- rabbit_types:content()).
+ (rabbit_types:exchange() | rabbit_exchange:name(),
+ rabbit_router:routing_key(), boolean(), boolean(),
+ rabbit_types:maybe(rabbit_types:txn()), properties_input(),
+ binary() | [binary()]) -> publish_result()).
+-spec(build_content/2 :: (rabbit_framing:amqp_property_record(),
+ binary() | [binary()]) -> rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
{rabbit_framing:amqp_property_record(), binary()}).
@@ -73,13 +68,14 @@
publish(Delivery = #delivery{
message = #basic_message{exchange_name = ExchangeName}}) ->
case rabbit_exchange:lookup(ExchangeName) of
- {ok, X} ->
- {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery),
- {ok, RoutingRes, DeliveredQPids};
- Other ->
- Other
+ {ok, X} -> publish(X, Delivery);
+ Other -> Other
end.
+publish(X, Delivery) ->
+ {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery),
+ {ok, RoutingRes, DeliveredQPids}.
+
delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) ->
#delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
sender = self(), message = Message, msg_seq_no = MsgSeqNo}.
@@ -136,9 +132,9 @@ message(ExchangeName, RoutingKey,
{error, _Reason} = Error -> Error
end.
-message(ExchangeName, RoutingKey, RawProperties, BodyBin) ->
+message(ExchangeName, RoutingKey, RawProperties, Body) ->
Properties = properties(RawProperties),
- Content = build_content(Properties, BodyBin),
+ Content = build_content(Properties, Body),
{ok, Msg} = message(ExchangeName, RoutingKey, Content),
Msg.
@@ -163,32 +159,31 @@ indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1).
%% Convenience function, for avoiding round-trips in calls across the
%% erlang distributed network.
-publish(ExchangeName, RoutingKeyBin, Properties, BodyBin) ->
- publish(ExchangeName, RoutingKeyBin, false, false, none, Properties,
+publish(Exchange, RoutingKeyBin, Properties, BodyBin) ->
+ publish(Exchange, RoutingKeyBin, false, false, none, Properties,
BodyBin).
%% Convenience function, for avoiding round-trips in calls across the
%% erlang distributed network.
-publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties,
+publish(Exchange, RoutingKeyBin, Mandatory, Immediate, Txn, Properties,
BodyBin) ->
- publish(delivery(Mandatory, Immediate, Txn,
- message(ExchangeName, RoutingKeyBin,
- properties(Properties), BodyBin),
- undefined)).
-
-%% It's faster if you already have an exchange and a message not to
-%% look up the exchange and disassemble and reassemble fragments
-republish(X, RoutingKey, Props, PFR) ->
- republish(X, RoutingKey, false, false, none, Props, PFR).
-
-%% It's faster if you already have an exchange and a message not to
-%% look up the exchange and disassemble and reassemble fragments
-republish(X = #exchange{name = XName},
- RoutingKey, Mandatory, Immediate, Txn, Props, PFR) ->
- {ok, Msg} = message(XName, RoutingKey, build_content(Props, PFR)),
- Delivery = delivery(Mandatory, Immediate, Txn, Msg, undefined),
- {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery),
- {ok, RoutingRes, DeliveredQPids}.
+ case exchange(Exchange) of
+ X = #exchange{} ->
+ publish(delivery(Mandatory, Immediate, Txn,
+ message(X#exchange.name, RoutingKeyBin,
+ properties(Properties), BodyBin),
+ undefined));
+ _ ->
+ {ok, unroutable, []}
+ end.
+
+exchange(X = #exchange{}) ->
+ X;
+exchange(N = #resource{kind = exchange}) ->
+ case rabbit_exchange:lookup(N) of
+ {ok, X} -> X;
+ Err -> Err
+ end.
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index e0681f15..cf8ee64c 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -27,7 +27,7 @@
-ifdef(use_specs).
--type(state() :: rabbit_exchange:name() | 'none').
+-type(state() :: rabbit_types:exchange() | 'none').
-spec(init/1 :: (rabbit_types:vhost()) -> state()).
-spec(tap_trace_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok').
@@ -86,7 +86,7 @@ maybe_trace(#exchange{name = Name}, #basic_message{exchange_name = Name},
maybe_trace(X, Msg = #basic_message{content = #content{
payload_fragments_rev = PFR}},
RKPrefix, RKSuffix, Extra) ->
- {ok, _, _} = rabbit_basic:republish(
+ {ok, _, _} = rabbit_basic:publish(
X, <<RKPrefix/binary, ".", RKSuffix/binary>>,
#'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR),
ok.