diff options
author | Emile Joubert <emile@rabbitmq.com> | 2011-05-10 16:33:46 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2011-05-10 16:33:46 +0100 |
commit | 7e3713e13764ccba3e9edc76066f178981c771bf (patch) | |
tree | cfbffa96b1007cc139759563d31de7af0452a445 /src | |
parent | 8dff232b17a8a50a617df3b4d4ccca3a890365c3 (diff) | |
parent | 8d6b67929bf5dcfbfad62c3e1fd232056a99c3f5 (diff) | |
download | rabbitmq-server-7e3713e13764ccba3e9edc76066f178981c771bf.tar.gz |
Merge bug20589 into default
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_channel.erl | 3 | ||||
-rw-r--r-- | src/rabbit_control.erl | 18 | ||||
-rw-r--r-- | src/rabbit_trace.erl | 101 |
3 files changed, 122 insertions, 0 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 083c5963..f0788862 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -281,6 +281,7 @@ handle_cast({deliver, ConsumerTag, AckRequired, true -> deliver; false -> deliver_no_ack end, State), + rabbit_trace:tap_trace_out(Msg), noreply(State1#ch{next_tag = DeliveryTag + 1}); handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> @@ -607,6 +608,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> + rabbit_trace:tap_trace_in(Message), {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -675,6 +677,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, true -> get_no_ack; false -> get end, State), + rabbit_trace:tap_trace_out(Msg), ok = rabbit_writer:send_command( WriterPid, #'basic.get_ok'{delivery_tag = DeliveryTag, diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 1af91f4c..6ab07111 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -282,6 +282,19 @@ action(list_consumers, Node, _Args, Opts, Inform) -> Other -> Other end; +action(set_env, Node, [Var, Term], _Opts, Inform) -> + Inform("Setting control variable ~s for node ~p to ~s", [Var, Node, Term]), + rpc_call(Node, application, set_env, [rabbit, parse(Var), parse(Term)]); + +action(get_env, Node, [Var], _Opts, Inform) -> + Inform("Getting control variable ~s for node ~p", [Var, Node]), + Val = rpc_call(Node, application, get_env, [rabbit, parse(Var)]), + io:format("~p~n", [Val]); + +action(unset_env, Node, [Var], _Opts, Inform) -> + Inform("Clearing control variable ~s for node ~p", [Var, Node]), + rpc_call(Node, application, unset_env, [rabbit, parse(Var)]); + action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), @@ -325,6 +338,11 @@ default_if_empty(List, Default) when is_list(List) -> true -> [list_to_atom(X) || X <- List] end. +parse(Str) -> + {ok, Tokens, _} = erl_scan:string(Str ++ "."), + {ok, Term} = erl_parse:parse_term(Tokens), + Term. + display_info_list(Results, InfoItemKeys) when is_list(Results) -> lists:foreach( fun (Result) -> display_row( diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl new file mode 100644 index 00000000..2d15e7fc --- /dev/null +++ b/src/rabbit_trace.erl @@ -0,0 +1,101 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_trace). + +-export([tap_trace_in/1, tap_trace_out/1]). + +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(tap_trace_in/1 :: (rabbit_types:basic_message()) -> 'ok'). +-spec(tap_trace_out/1 :: (rabbit_amqqueue:qmsg()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +tap_trace_in(Msg) -> + maybe_trace(Msg, <<"publish">>, xname(Msg), []). + +tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}) -> + RedeliveredNum = case Redelivered of true -> 1; false -> 0 end, + maybe_trace(Msg, <<"deliver">>, QName, + [{<<"redelivered">>, signedint, RedeliveredNum}]). + +xname(#basic_message{exchange_name = #resource{name = XName}}) -> XName. +vhost(#basic_message{exchange_name = #resource{virtual_host = VHost}}) -> VHost. + +maybe_trace(Msg, RKPrefix, RKSuffix, Extra) -> + XName = xname(Msg), + case trace_exchange(vhost(Msg)) of + none -> ok; + XName -> ok; + TraceX -> case catch trace(TraceX, Msg, RKPrefix, RKSuffix, Extra) of + {'EXIT', R} -> rabbit_log:info("Trace died: ~p~n", [R]); + ok -> ok + end + end. + +trace_exchange(VHost) -> + case application:get_env(rabbit, trace_exchanges) of + undefined -> none; + {ok, Xs} -> proplists:get_value(VHost, Xs, none) + end. + +trace(TraceX, Msg0, RKPrefix, RKSuffix, Extra) -> + Msg = ensure_content_decoded(Msg0), + rabbit_basic:publish(rabbit_misc:r(vhost(Msg), exchange, TraceX), + <<RKPrefix/binary, ".", RKSuffix/binary>>, + #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, + payload(Msg)), + ok. + +msg_to_table(#basic_message{exchange_name = #resource{name = XName}, + routing_keys = RoutingKeys, + content = #content{properties = Props}}) -> + {PropsTable, _Ix} = + lists:foldl( + fun (K, {L, Ix}) -> + V = element(Ix, Props), + NewL = case V of + undefined -> L; + _ -> [{a2b(K), type(V), V} | L] + end, + {NewL, Ix + 1} + end, {[], 2}, record_info(fields, 'P_basic')), + [{<<"exchange_name">>, longstr, XName}, + {<<"routing_keys">>, array, [{longstr, K} || K <- RoutingKeys]}, + {<<"properties">>, table, PropsTable}, + {<<"node">>, longstr, a2b(node())}]. + +payload(#basic_message{content = #content{payload_fragments_rev = PFR}}) -> + list_to_binary(lists:reverse(PFR)). + +ensure_content_decoded(Msg = #basic_message{content = Content}) -> + Msg#basic_message{content = rabbit_binary_parser:ensure_content_decoded( + Content)}. + +a2b(A) -> + list_to_binary(atom_to_list(A)). + +type(V) when is_list(V) -> table; +type(V) when is_integer(V) -> signedint; +type(_V) -> longstr. |