diff options
-rw-r--r-- | src/rabbit_basic.erl | 30 | ||||
-rw-r--r-- | src/rabbit_trace.erl | 32 |
2 files changed, 43 insertions, 19 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 3cf73e80..355e390e 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -19,7 +19,7 @@ -include("rabbit_framing.hrl"). -export([publish/1, message/3, message/4, properties/1, delivery/5]). --export([publish/4, publish/7]). +-export([publish/4, publish/7, republish/4, republish/7]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -54,6 +54,13 @@ (rabbit_exchange:name(), rabbit_router:routing_key(), boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), properties_input(), binary()) -> publish_result()). +-spec(republish/4 :: + (rabbit_types:exchange(), rabbit_router:routing_key(), + properties_input(), [binary()]) -> publish_result()). +-spec(republish/7 :: + (rabbit_types:exchange(), rabbit_router:routing_key(), + boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), + properties_input(), [binary()]) -> publish_result()). -spec(build_content/2 :: (rabbit_framing:amqp_property_record(), binary()) -> rabbit_types:content()). -spec(from_content/1 :: (rabbit_types:content()) -> @@ -77,7 +84,10 @@ delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) -> #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, sender = self(), message = Message, msg_seq_no = MsgSeqNo}. -build_content(Properties, BodyBin) -> +build_content(Properties, BodyBin) when is_binary(BodyBin) -> + build_content(Properties, [BodyBin]); + +build_content(Properties, PFR) -> %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 {ClassId, _MethodId} = rabbit_framing_amqp_0_9_1:method_id('basic.publish'), @@ -85,7 +95,7 @@ build_content(Properties, BodyBin) -> properties = Properties, properties_bin = none, protocol = none, - payload_fragments_rev = [BodyBin]}. + payload_fragments_rev = PFR}. from_content(Content) -> #content{class_id = ClassId, @@ -166,6 +176,20 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties, properties(Properties), BodyBin), undefined)). +%% It's faster if you already have an exchange and a message not to +%% look up the exchange and disassemble and reassemble fragments +republish(X, RoutingKey, Props, PFR) -> + republish(X, RoutingKey, false, false, none, Props, PFR). + +%% It's faster if you already have an exchange and a message not to +%% look up the exchange and disassemble and reassemble fragments +republish(X = #exchange{name = XName}, + RoutingKey, Mandatory, Immediate, Txn, Props, PFR) -> + {ok, Msg} = message(XName, RoutingKey, build_content(Props, PFR)), + Delivery = delivery(Mandatory, Immediate, Txn, Msg, undefined), + {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery), + {ok, RoutingRes, DeliveredQPids}. + is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> case Mode of diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index 6228c54e..e0681f15 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -44,17 +44,21 @@ init(VHost) -> {ok, XNs} = application:get_env(rabbit, ?TRACE_EXCHANGES), case proplists:get_value(VHost, XNs, none) of none -> none; - Name -> rabbit_misc:r(VHost, exchange, Name) + Name -> case rabbit_exchange:lookup( + rabbit_misc:r(VHost, exchange, Name)) of + {ok, X} -> X; + _ -> none + end end. tap_trace_in(Msg = #basic_message{exchange_name = #resource{name = XName}}, - TraceXN) -> - maybe_trace(TraceXN, Msg, <<"publish">>, XName, []). + TraceX) -> + maybe_trace(TraceX, Msg, <<"publish">>, XName, []). tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}, - TraceXN) -> + TraceX) -> RedeliveredNum = case Redelivered of true -> 1; false -> 0 end, - maybe_trace(TraceXN, Msg, <<"deliver">>, QName, + maybe_trace(TraceX, Msg, <<"deliver">>, QName, [{<<"redelivered">>, signedint, RedeliveredNum}]). %%---------------------------------------------------------------------------- @@ -76,20 +80,16 @@ update_config(Fun) -> maybe_trace(none, _Msg, _RKPrefix, _RKSuffix, _Extra) -> ok; -maybe_trace(XName, #basic_message{exchange_name = #resource{name = XName}}, +maybe_trace(#exchange{name = Name}, #basic_message{exchange_name = Name}, _RKPrefix, _RKSuffix, _Extra) -> ok; -maybe_trace(XName, Msg = #basic_message{content = #content{ - payload_fragments_rev = PFR}}, +maybe_trace(X, Msg = #basic_message{content = #content{ + payload_fragments_rev = PFR}}, RKPrefix, RKSuffix, Extra) -> - case rabbit_basic:publish(XName, - <<RKPrefix/binary, ".", RKSuffix/binary>>, - #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, - list_to_binary(lists:reverse(PFR))) of - {ok, _, _} -> ok; - {error, not_found} -> rabbit_log:info("trace ~s not found~n", - [rabbit_misc:rs(XName)]) - end. + {ok, _, _} = rabbit_basic:republish( + X, <<RKPrefix/binary, ".", RKSuffix/binary>>, + #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR), + ok. msg_to_table(#basic_message{exchange_name = #resource{name = XName}, routing_keys = RoutingKeys, |