summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-04-05 18:04:32 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-04-05 18:04:32 +0100
commit5657326eff7e29caf48e5f147febecd8f65a3bef (patch)
treed6980dbb3cf13378be732d15d1bb1cd2dfe44f56
parentcde3c81ea4895428f41715766dcc416b37773c37 (diff)
parent18443fcb146e270af9116bb6deff56068746c384 (diff)
downloadrabbitmq-server-5657326eff7e29caf48e5f147febecd8f65a3bef.tar.gz
Merge in default and untested debitrot.
-rw-r--r--src/rabbit_channel.erl4
-rw-r--r--src/rabbit_control.erl17
-rw-r--r--src/rabbit_log.erl115
3 files changed, 136 insertions, 0 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5099bf3f..d8a7e3d6 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -280,6 +280,7 @@ handle_cast({deliver, ConsumerTag, AckRequired,
true -> deliver;
false -> deliver_no_ack
end, State),
+ rabbit_log:tap_trace_out(Msg, DeliveryTag, ConsumerTag),
noreply(State1#ch{next_tag = DeliveryTag + 1});
handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
@@ -611,6 +612,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
State2 = process_routing_result(RoutingRes, DeliveredQPids,
ExchangeName, MsgSeqNo, Message,
State1),
+ %% TODO is this in the right place?
+ rabbit_log:tap_trace_in(Message, DeliveredQPids),
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
QPid <- DeliveredQPids]], publish, State2),
@@ -671,6 +674,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
true -> get_no_ack;
false -> get
end, State),
+ rabbit_log:tap_trace_out(Msg, DeliveryTag, none),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 1af91f4c..1ec36e49 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -282,6 +282,18 @@ action(list_consumers, Node, _Args, Opts, Inform) ->
Other -> Other
end;
+action(set_env, Node, [VarStr, TermStr], _Opts, Inform) ->
+ Inform("Setting control variable ~s for node ~p to ~s", [VarStr, Node, TermStr]),
+ rpc_call(Node, application, set_env, [rabbit, parse_term(VarStr), parse_term(TermStr)]);
+
+action(get_env, Node, [VarStr], _Opts, Inform) ->
+ Inform("Getting control variable ~s for node ~p", [VarStr, Node]),
+ io:format("~p~n", [rpc_call(Node, application, get_env, [rabbit, parse_term(VarStr)])]);
+
+action(unset_env, Node, [VarStr], _Opts, Inform) ->
+ Inform("Clearing control variable ~s for node ~p", [VarStr, Node]),
+ rpc_call(Node, application, unset_env, [rabbit, parse_term(VarStr)]);
+
action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]),
@@ -325,6 +337,11 @@ default_if_empty(List, Default) when is_list(List) ->
true -> [list_to_atom(X) || X <- List]
end.
+parse_term(Str) ->
+ {ok, Tokens, _} = erl_scan:string(Str ++ "."),
+ {ok, Term} = erl_parse:parse_term(Tokens),
+ Term.
+
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
lists:foreach(
fun (Result) -> display_row(
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 8207d6bc..075a5243 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -26,6 +26,11 @@
-export([debug/1, debug/2, message/4, info/1, info/2,
warning/1, warning/2, error/1, error/2]).
+-export([tap_trace_in/2, tap_trace_out/3]).
+
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+
-define(SERVER, ?MODULE).
%%----------------------------------------------------------------------------
@@ -77,6 +82,116 @@ error(Fmt) ->
error(Fmt, Args) when is_list(Args) ->
gen_server:cast(?SERVER, {error, Fmt, Args}).
+tap_trace_in(Message = #basic_message{exchange_name = #resource{
+ virtual_host = VHostBin,
+ name = XNameBin}},
+ QPids) ->
+ check_trace(
+ VHostBin,
+ fun (TraceExchangeBin) ->
+ QInfos = [rabbit_amqqueue:info(#amqqueue{pid = P}, [name]) ||
+ P <- QPids],
+ QNames = [N || [{name, #resource{name = N}}] <- QInfos],
+ QNamesStr = list_to_binary(rabbit_misc:intersperse(",", QNames)),
+ EncodedMessage = message_to_table(Message),
+ maybe_inject(TraceExchangeBin, VHostBin, XNameBin,
+ <<"publish">>, XNameBin,
+ [{<<"queue_names">>, longstr, QNamesStr},
+ {<<"message">>, table, EncodedMessage}])
+ end).
+
+tap_trace_out({#resource{name = QNameBin}, _QPid, QMsgId, Redelivered,
+ Message = #basic_message{exchange_name = #resource{
+ virtual_host = VHostBin,
+ name = XNameBin}}},
+ DeliveryTag,
+ ConsumerTagOrNone) ->
+ check_trace(
+ VHostBin,
+ fun (TraceExchangeBin) ->
+ RedeliveredNum = case Redelivered of true -> 1; false -> 0 end,
+ EncodedMessage = message_to_table(Message),
+ Fields0 = [{<<"delivery_tag">>, signedint, DeliveryTag}, %% FIXME later
+ {<<"queue_msg_number">>, signedint, QMsgId},
+ {<<"redelivered">>, signedint, RedeliveredNum},
+ {<<"message">>, table, EncodedMessage}],
+ Fields = case ConsumerTagOrNone of
+ none ->
+ Fields0;
+ ConsumerTag ->
+ [{<<"consumer_tag">>, longstr, ConsumerTag}
+ | Fields0]
+ end,
+ maybe_inject(TraceExchangeBin, VHostBin, XNameBin,
+ <<"deliver">>, QNameBin, Fields)
+ end).
+
+check_trace(VHostBin, F) ->
+ case catch case application:get_env(rabbit, {trace_exchange, VHostBin}) of
+ undefined -> ok;
+ {ok, TraceExchangeBin} -> F(TraceExchangeBin)
+ end of
+ {'EXIT', Reason} -> info("Trace tap died with reason ~p~n", [Reason]);
+ ok -> ok
+ end.
+
+maybe_inject(TraceExchangeBin, VHostBin, OriginalExchangeBin,
+ RKPrefix, RKSuffix, Table) ->
+ if
+ TraceExchangeBin =:= OriginalExchangeBin ->
+ ok;
+ true ->
+ rabbit_exchange:simple_publish(
+ false,
+ false,
+ rabbit_misc:r(VHostBin, exchange, TraceExchangeBin),
+ <<RKPrefix/binary, ".", RKSuffix/binary>>,
+ <<"application/x-amqp-table; version=0-8">>,
+ rabbit_binary_generator:generate_table(Table)),
+ ok
+ end.
+
+message_to_table(#basic_message{exchange_name = #resource{name = XName},
+ routing_keys = RoutingKeys,
+ content = Content}) ->
+ #content{properties = #'P_basic'{content_type = ContentType,
+ content_encoding = ContentEncoding,
+ headers = Headers,
+ delivery_mode = DeliveryMode,
+ priority = Priority,
+ correlation_id = CorrelationId,
+ reply_to = ReplyTo,
+ expiration = Expiration,
+ message_id = MessageId,
+ timestamp = Timestamp,
+ type = Type,
+ user_id = UserId,
+ app_id = AppId},
+ payload_fragments_rev = PFR} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ Headers = prune_undefined(
+ [{<<"content_type">>, longstr, ContentType},
+ {<<"content_encoding">>, longstr, ContentEncoding},
+ {<<"headers">>, table, Headers},
+ {<<"delivery_mode">>, signedint, DeliveryMode},
+ {<<"priority">>, signedint, Priority},
+ {<<"correlation_id">>, longstr, CorrelationId},
+ {<<"reply_to">>, longstr, ReplyTo},
+ {<<"expiration">>, longstr, Expiration},
+ {<<"message_id">>, longstr, MessageId},
+ {<<"timestamp">>, longstr, Timestamp},
+ {<<"type">>, longstr, Type},
+ {<<"user_id">>, longstr, UserId},
+ {<<"app_id">>, longstr, AppId}]),
+ [{<<"exchange_name">>, longstr, XName},
+ {<<"routing_key">>, array, [{longstr, K} || K <- RoutingKeys]},
+ {<<"headers">>, table, Headers},
+ {<<"body">>, longstr, list_to_binary(lists:reverse(PFR))}].
+
+prune_undefined(Fields) ->
+ [F || F = {_, _, Value} <- Fields,
+ Value =/= undefined].
+
%%--------------------------------------------------------------------
init([]) -> {ok, none}.