summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-09-06 09:32:55 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-09-06 09:32:55 +0100
commit5e6b11369cd3a4c35f1681cdf3bda33c521cb5d0 (patch)
tree59c8c6d8ea1471ad6ffcbfbeb80f69b3cb84e8e6
parent7737115093a8d5be5a67f09baa204550977cdd07 (diff)
downloadrabbitmq-server-5e6b11369cd3a4c35f1681cdf3bda33c521cb5d0.tar.gz
tweak 'publish' funs to make their relationship more obvious
plus some cosmetic renaming for consistency
-rw-r--r--src/rabbit_basic.erl74
1 files changed, 36 insertions, 38 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 9cc406e7..b266d366 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -18,8 +18,8 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/1, message/3, message/4, properties/1, delivery/4]).
--export([publish/4, publish/6]).
+-export([publish/4, publish/6, publish/1,
+ message/3, message/4, properties/1, delivery/4]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -35,6 +35,12 @@
-type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())).
-type(body_input() :: (binary() | [binary()])).
+-spec(publish/4 ::
+ (exchange_input(), rabbit_router:routing_key(), properties_input(),
+ body_input()) -> publish_result()).
+-spec(publish/6 ::
+ (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(),
+ properties_input(), body_input()) -> publish_result()).
-spec(publish/1 ::
(rabbit_types:delivery()) -> publish_result()).
-spec(delivery/4 ::
@@ -49,12 +55,6 @@
rabbit_types:ok_or_error2(rabbit_types:message(), any())).
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
--spec(publish/4 ::
- (exchange_input(), rabbit_router:routing_key(), properties_input(),
- body_input()) -> publish_result()).
--spec(publish/6 ::
- (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(),
- properties_input(), body_input()) -> publish_result()).
-spec(build_content/2 :: (rabbit_framing:amqp_property_record(),
binary() | [binary()]) -> rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
@@ -64,13 +64,34 @@
%%----------------------------------------------------------------------------
+%% Convenience function, for avoiding round-trips in calls across the
+%% erlang distributed network.
+publish(Exchange, RoutingKeyBin, Properties, Body) ->
+ publish(Exchange, RoutingKeyBin, false, false, Properties, Body).
+
+%% Convenience function, for avoiding round-trips in calls across the
+%% erlang distributed network.
+publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) ->
+ publish(X, delivery(Mandatory, Immediate,
+ message(XName, RKey, properties(Props), Body),
+ undefined));
+publish(XName, RKey, Mandatory, Immediate, Props, Body) ->
+ publish(delivery(Mandatory, Immediate,
+ message(XName, RKey, properties(Props), Body),
+ undefined)).
+
publish(Delivery = #delivery{
- message = #basic_message{exchange_name = ExchangeName}}) ->
- case rabbit_exchange:lookup(ExchangeName) of
+ message = #basic_message{exchange_name = XName}}) ->
+ case rabbit_exchange:lookup(XName) of
{ok, X} -> publish(X, Delivery);
- Other -> Other
+ Err -> Err
end.
+publish(X, Delivery) ->
+ {RoutingRes, DeliveredQPids} =
+ rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery),
+ {ok, RoutingRes, DeliveredQPids}.
+
delivery(Mandatory, Immediate, Message, MsgSeqNo) ->
#delivery{mandatory = Mandatory, immediate = Immediate, sender = self(),
message = Message, msg_seq_no = MsgSeqNo}.
@@ -113,11 +134,10 @@ strip_header(#content{properties = Props = #'P_basic'{headers = Headers}}
headers = Headers0}})
end.
-message(ExchangeName, RoutingKey,
- #content{properties = Props} = DecodedContent) ->
+message(XName, RoutingKey, #content{properties = Props} = DecodedContent) ->
try
{ok, #basic_message{
- exchange_name = ExchangeName,
+ exchange_name = XName,
content = strip_header(DecodedContent, ?DELETED_HEADER),
id = rabbit_guid:guid(),
is_persistent = is_message_persistent(DecodedContent),
@@ -127,10 +147,10 @@ message(ExchangeName, RoutingKey,
{error, _Reason} = Error -> Error
end.
-message(ExchangeName, RoutingKey, RawProperties, Body) ->
+message(XName, RoutingKey, RawProperties, Body) ->
Properties = properties(RawProperties),
Content = build_content(Properties, Body),
- {ok, Msg} = message(ExchangeName, RoutingKey, Content),
+ {ok, Msg} = message(XName, RoutingKey, Content),
Msg.
properties(P = #'P_basic'{}) ->
@@ -152,28 +172,6 @@ indexof([], _Element, _N) -> 0;
indexof([Element | _Rest], Element, N) -> N;
indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1).
-%% Convenience function, for avoiding round-trips in calls across the
-%% erlang distributed network.
-publish(Exchange, RoutingKeyBin, Properties, Body) ->
- publish(Exchange, RoutingKeyBin, false, false, Properties, Body).
-
-%% Convenience function, for avoiding round-trips in calls across the
-%% erlang distributed network.
-publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) ->
- publish(X, delivery(Mandatory, Immediate,
- message(XName, RKey, properties(Props), Body),
- undefined));
-publish(XName, RKey, Mandatory, Immediate, Props, Body) ->
- case rabbit_exchange:lookup(XName) of
- {ok, X} -> publish(X, RKey, Mandatory, Immediate, Props, Body);
- Err -> Err
- end.
-
-publish(X, Delivery) ->
- {RoutingRes, DeliveredQPids} =
- rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery),
- {ok, RoutingRes, DeliveredQPids}.
-
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
case Mode of