diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-04-05 18:04:32 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-04-05 18:04:32 +0100 |
commit | 5657326eff7e29caf48e5f147febecd8f65a3bef (patch) | |
tree | d6980dbb3cf13378be732d15d1bb1cd2dfe44f56 | |
parent | cde3c81ea4895428f41715766dcc416b37773c37 (diff) | |
parent | 18443fcb146e270af9116bb6deff56068746c384 (diff) | |
download | rabbitmq-server-5657326eff7e29caf48e5f147febecd8f65a3bef.tar.gz |
Merge in default and untested debitrot.
-rw-r--r-- | src/rabbit_channel.erl | 4 | ||||
-rw-r--r-- | src/rabbit_control.erl | 17 | ||||
-rw-r--r-- | src/rabbit_log.erl | 115 |
3 files changed, 136 insertions, 0 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5099bf3f..d8a7e3d6 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -280,6 +280,7 @@ handle_cast({deliver, ConsumerTag, AckRequired, true -> deliver; false -> deliver_no_ack end, State), + rabbit_log:tap_trace_out(Msg, DeliveryTag, ConsumerTag), noreply(State1#ch{next_tag = DeliveryTag + 1}); handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> @@ -611,6 +612,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, State2 = process_routing_result(RoutingRes, DeliveredQPids, ExchangeName, MsgSeqNo, Message, State1), + %% TODO is this in the right place? + rabbit_log:tap_trace_in(Message, DeliveredQPids), maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || QPid <- DeliveredQPids]], publish, State2), @@ -671,6 +674,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, true -> get_no_ack; false -> get end, State), + rabbit_log:tap_trace_out(Msg, DeliveryTag, none), ok = rabbit_writer:send_command( WriterPid, #'basic.get_ok'{delivery_tag = DeliveryTag, diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 1af91f4c..1ec36e49 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -282,6 +282,18 @@ action(list_consumers, Node, _Args, Opts, Inform) -> Other -> Other end; +action(set_env, Node, [VarStr, TermStr], _Opts, Inform) -> + Inform("Setting control variable ~s for node ~p to ~s", [VarStr, Node, TermStr]), + rpc_call(Node, application, set_env, [rabbit, parse_term(VarStr), parse_term(TermStr)]); + +action(get_env, Node, [VarStr], _Opts, Inform) -> + Inform("Getting control variable ~s for node ~p", [VarStr, Node]), + io:format("~p~n", [rpc_call(Node, application, get_env, [rabbit, parse_term(VarStr)])]); + +action(unset_env, Node, [VarStr], _Opts, Inform) -> + Inform("Clearing control variable ~s for node ~p", [VarStr, Node]), + rpc_call(Node, application, unset_env, [rabbit, parse_term(VarStr)]); + action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), @@ -325,6 +337,11 @@ default_if_empty(List, Default) when is_list(List) -> true -> [list_to_atom(X) || X <- List] end. +parse_term(Str) -> + {ok, Tokens, _} = erl_scan:string(Str ++ "."), + {ok, Term} = erl_parse:parse_term(Tokens), + Term. + display_info_list(Results, InfoItemKeys) when is_list(Results) -> lists:foreach( fun (Result) -> display_row( diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index 8207d6bc..075a5243 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -26,6 +26,11 @@ -export([debug/1, debug/2, message/4, info/1, info/2, warning/1, warning/2, error/1, error/2]). +-export([tap_trace_in/2, tap_trace_out/3]). + +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + -define(SERVER, ?MODULE). %%---------------------------------------------------------------------------- @@ -77,6 +82,116 @@ error(Fmt) -> error(Fmt, Args) when is_list(Args) -> gen_server:cast(?SERVER, {error, Fmt, Args}). +tap_trace_in(Message = #basic_message{exchange_name = #resource{ + virtual_host = VHostBin, + name = XNameBin}}, + QPids) -> + check_trace( + VHostBin, + fun (TraceExchangeBin) -> + QInfos = [rabbit_amqqueue:info(#amqqueue{pid = P}, [name]) || + P <- QPids], + QNames = [N || [{name, #resource{name = N}}] <- QInfos], + QNamesStr = list_to_binary(rabbit_misc:intersperse(",", QNames)), + EncodedMessage = message_to_table(Message), + maybe_inject(TraceExchangeBin, VHostBin, XNameBin, + <<"publish">>, XNameBin, + [{<<"queue_names">>, longstr, QNamesStr}, + {<<"message">>, table, EncodedMessage}]) + end). + +tap_trace_out({#resource{name = QNameBin}, _QPid, QMsgId, Redelivered, + Message = #basic_message{exchange_name = #resource{ + virtual_host = VHostBin, + name = XNameBin}}}, + DeliveryTag, + ConsumerTagOrNone) -> + check_trace( + VHostBin, + fun (TraceExchangeBin) -> + RedeliveredNum = case Redelivered of true -> 1; false -> 0 end, + EncodedMessage = message_to_table(Message), + Fields0 = [{<<"delivery_tag">>, signedint, DeliveryTag}, %% FIXME later + {<<"queue_msg_number">>, signedint, QMsgId}, + {<<"redelivered">>, signedint, RedeliveredNum}, + {<<"message">>, table, EncodedMessage}], + Fields = case ConsumerTagOrNone of + none -> + Fields0; + ConsumerTag -> + [{<<"consumer_tag">>, longstr, ConsumerTag} + | Fields0] + end, + maybe_inject(TraceExchangeBin, VHostBin, XNameBin, + <<"deliver">>, QNameBin, Fields) + end). + +check_trace(VHostBin, F) -> + case catch case application:get_env(rabbit, {trace_exchange, VHostBin}) of + undefined -> ok; + {ok, TraceExchangeBin} -> F(TraceExchangeBin) + end of + {'EXIT', Reason} -> info("Trace tap died with reason ~p~n", [Reason]); + ok -> ok + end. + +maybe_inject(TraceExchangeBin, VHostBin, OriginalExchangeBin, + RKPrefix, RKSuffix, Table) -> + if + TraceExchangeBin =:= OriginalExchangeBin -> + ok; + true -> + rabbit_exchange:simple_publish( + false, + false, + rabbit_misc:r(VHostBin, exchange, TraceExchangeBin), + <<RKPrefix/binary, ".", RKSuffix/binary>>, + <<"application/x-amqp-table; version=0-8">>, + rabbit_binary_generator:generate_table(Table)), + ok + end. + +message_to_table(#basic_message{exchange_name = #resource{name = XName}, + routing_keys = RoutingKeys, + content = Content}) -> + #content{properties = #'P_basic'{content_type = ContentType, + content_encoding = ContentEncoding, + headers = Headers, + delivery_mode = DeliveryMode, + priority = Priority, + correlation_id = CorrelationId, + reply_to = ReplyTo, + expiration = Expiration, + message_id = MessageId, + timestamp = Timestamp, + type = Type, + user_id = UserId, + app_id = AppId}, + payload_fragments_rev = PFR} = + rabbit_binary_parser:ensure_content_decoded(Content), + Headers = prune_undefined( + [{<<"content_type">>, longstr, ContentType}, + {<<"content_encoding">>, longstr, ContentEncoding}, + {<<"headers">>, table, Headers}, + {<<"delivery_mode">>, signedint, DeliveryMode}, + {<<"priority">>, signedint, Priority}, + {<<"correlation_id">>, longstr, CorrelationId}, + {<<"reply_to">>, longstr, ReplyTo}, + {<<"expiration">>, longstr, Expiration}, + {<<"message_id">>, longstr, MessageId}, + {<<"timestamp">>, longstr, Timestamp}, + {<<"type">>, longstr, Type}, + {<<"user_id">>, longstr, UserId}, + {<<"app_id">>, longstr, AppId}]), + [{<<"exchange_name">>, longstr, XName}, + {<<"routing_key">>, array, [{longstr, K} || K <- RoutingKeys]}, + {<<"headers">>, table, Headers}, + {<<"body">>, longstr, list_to_binary(lists:reverse(PFR))}]. + +prune_undefined(Fields) -> + [F || F = {_, _, Value} <- Fields, + Value =/= undefined]. + %%-------------------------------------------------------------------- init([]) -> {ok, none}. |