summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_basic.erl30
-rw-r--r--src/rabbit_trace.erl32
2 files changed, 43 insertions, 19 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 3cf73e80..355e390e 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -19,7 +19,7 @@
-include("rabbit_framing.hrl").
-export([publish/1, message/3, message/4, properties/1, delivery/5]).
--export([publish/4, publish/7]).
+-export([publish/4, publish/7, republish/4, republish/7]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -54,6 +54,13 @@
(rabbit_exchange:name(), rabbit_router:routing_key(),
boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
properties_input(), binary()) -> publish_result()).
+-spec(republish/4 ::
+ (rabbit_types:exchange(), rabbit_router:routing_key(),
+ properties_input(), [binary()]) -> publish_result()).
+-spec(republish/7 ::
+ (rabbit_types:exchange(), rabbit_router:routing_key(),
+ boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
+ properties_input(), [binary()]) -> publish_result()).
-spec(build_content/2 :: (rabbit_framing:amqp_property_record(), binary()) ->
rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
@@ -77,7 +84,10 @@ delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) ->
#delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
sender = self(), message = Message, msg_seq_no = MsgSeqNo}.
-build_content(Properties, BodyBin) ->
+build_content(Properties, BodyBin) when is_binary(BodyBin) ->
+ build_content(Properties, [BodyBin]);
+
+build_content(Properties, PFR) ->
%% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
{ClassId, _MethodId} =
rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
@@ -85,7 +95,7 @@ build_content(Properties, BodyBin) ->
properties = Properties,
properties_bin = none,
protocol = none,
- payload_fragments_rev = [BodyBin]}.
+ payload_fragments_rev = PFR}.
from_content(Content) ->
#content{class_id = ClassId,
@@ -166,6 +176,20 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties,
properties(Properties), BodyBin),
undefined)).
+%% It's faster if you already have an exchange and a message not to
+%% look up the exchange and disassemble and reassemble fragments
+republish(X, RoutingKey, Props, PFR) ->
+ republish(X, RoutingKey, false, false, none, Props, PFR).
+
+%% It's faster if you already have an exchange and a message not to
+%% look up the exchange and disassemble and reassemble fragments
+republish(X = #exchange{name = XName},
+ RoutingKey, Mandatory, Immediate, Txn, Props, PFR) ->
+ {ok, Msg} = message(XName, RoutingKey, build_content(Props, PFR)),
+ Delivery = delivery(Mandatory, Immediate, Txn, Msg, undefined),
+ {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery),
+ {ok, RoutingRes, DeliveredQPids}.
+
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
case Mode of
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index 6228c54e..e0681f15 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -44,17 +44,21 @@ init(VHost) ->
{ok, XNs} = application:get_env(rabbit, ?TRACE_EXCHANGES),
case proplists:get_value(VHost, XNs, none) of
none -> none;
- Name -> rabbit_misc:r(VHost, exchange, Name)
+ Name -> case rabbit_exchange:lookup(
+ rabbit_misc:r(VHost, exchange, Name)) of
+ {ok, X} -> X;
+ _ -> none
+ end
end.
tap_trace_in(Msg = #basic_message{exchange_name = #resource{name = XName}},
- TraceXN) ->
- maybe_trace(TraceXN, Msg, <<"publish">>, XName, []).
+ TraceX) ->
+ maybe_trace(TraceX, Msg, <<"publish">>, XName, []).
tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg},
- TraceXN) ->
+ TraceX) ->
RedeliveredNum = case Redelivered of true -> 1; false -> 0 end,
- maybe_trace(TraceXN, Msg, <<"deliver">>, QName,
+ maybe_trace(TraceX, Msg, <<"deliver">>, QName,
[{<<"redelivered">>, signedint, RedeliveredNum}]).
%%----------------------------------------------------------------------------
@@ -76,20 +80,16 @@ update_config(Fun) ->
maybe_trace(none, _Msg, _RKPrefix, _RKSuffix, _Extra) ->
ok;
-maybe_trace(XName, #basic_message{exchange_name = #resource{name = XName}},
+maybe_trace(#exchange{name = Name}, #basic_message{exchange_name = Name},
_RKPrefix, _RKSuffix, _Extra) ->
ok;
-maybe_trace(XName, Msg = #basic_message{content = #content{
- payload_fragments_rev = PFR}},
+maybe_trace(X, Msg = #basic_message{content = #content{
+ payload_fragments_rev = PFR}},
RKPrefix, RKSuffix, Extra) ->
- case rabbit_basic:publish(XName,
- <<RKPrefix/binary, ".", RKSuffix/binary>>,
- #'P_basic'{headers = msg_to_table(Msg) ++ Extra},
- list_to_binary(lists:reverse(PFR))) of
- {ok, _, _} -> ok;
- {error, not_found} -> rabbit_log:info("trace ~s not found~n",
- [rabbit_misc:rs(XName)])
- end.
+ {ok, _, _} = rabbit_basic:republish(
+ X, <<RKPrefix/binary, ".", RKSuffix/binary>>,
+ #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR),
+ ok.
msg_to_table(#basic_message{exchange_name = #resource{name = XName},
routing_keys = RoutingKeys,