summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-05-09 23:26:42 +0100
committerMatthias Radestock <matthias@lshift.net>2009-05-09 23:26:42 +0100
commit18443fcb146e270af9116bb6deff56068746c384 (patch)
tree3e437641b9074f1a61beba9e77f70e28bb980dc0
parent09450cdccb23baaf936a37c045b9875a35fb3fc8 (diff)
downloadrabbitmq-server-18443fcb146e270af9116bb6deff56068746c384.tar.gz
cosmetic
-rw-r--r--src/rabbit_log.erl159
1 files changed, 76 insertions, 83 deletions
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 93c01400..87cda552 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -100,70 +100,61 @@ error(Fmt) ->
error(Fmt, Args) when is_list(Args) ->
gen_server:cast(?SERVER, {error, Fmt, Args}).
-tap_trace_in(Message = #basic_message{exchange_name = #resource{virtual_host = VHostBin,
- name = XNameBin}},
+tap_trace_in(Message = #basic_message{exchange_name = #resource{
+ virtual_host = VHostBin,
+ name = XNameBin}},
QPids) ->
- check_trace(VHostBin,
- fun (TraceExchangeBin) ->
- QInfos = [rabbit_amqqueue:info(#amqqueue{pid = P}, [name]) || P <- QPids],
- QNames = [N || [{name, #resource{name = N}}] <- QInfos],
- maybe_inject(TraceExchangeBin,
- VHostBin,
- XNameBin,
- <<"publish">>,
- XNameBin,
- [{<<"queue_names">>,
- longstr,
- list_to_binary(rabbit_misc:intersperse(",", QNames))},
- {<<"message">>,
- table,
- message_to_table(Message)}])
- end).
+ check_trace(
+ VHostBin,
+ fun (TraceExchangeBin) ->
+ QInfos = [rabbit_amqqueue:info(#amqqueue{pid = P}, [name]) ||
+ P <- QPids],
+ QNames = [N || [{name, #resource{name = N}}] <- QInfos],
+ QNamesStr = list_to_binary(rabbit_misc:intersperse(",", QNames)),
+ EncodedMessage = message_to_table(Message),
+ maybe_inject(TraceExchangeBin, VHostBin, XNameBin,
+ <<"publish">>, XNameBin,
+ [{<<"queue_names">>, longstr, QNamesStr},
+ {<<"message">>, table, EncodedMessage}])
+ end).
tap_trace_out({#resource{name = QNameBin}, _QPid, QMsgId, Redelivered,
- Message = #basic_message{exchange_name = #resource{virtual_host = VHostBin,
- name = XNameBin}}},
+ Message = #basic_message{exchange_name = #resource{
+ virtual_host = VHostBin,
+ name = XNameBin}}},
DeliveryTag,
ConsumerTagOrNone) ->
- check_trace(VHostBin,
- fun (TraceExchangeBin) ->
- RedeliveredNum = case Redelivered of true -> 1; false -> 0 end,
- Fields0 = [{<<"delivery_tag">>, signedint, DeliveryTag}, %% FIXME later
- {<<"queue_msg_number">>, signedint, QMsgId},
- {<<"redelivered">>, signedint, RedeliveredNum},
- {<<"message">>, table, message_to_table(Message)}],
- Fields = case ConsumerTagOrNone of
- none -> Fields0;
- ConsumerTag -> [{<<"consumer_tag">>, longstr, ConsumerTag}
- | Fields0]
- end,
- maybe_inject(TraceExchangeBin,
- VHostBin,
- XNameBin,
- <<"deliver">>,
- QNameBin,
- Fields)
- end).
+ check_trace(
+ VHostBin,
+ fun (TraceExchangeBin) ->
+ RedeliveredNum = case Redelivered of true -> 1; false -> 0 end,
+ EncodedMessage = message_to_table(Message),
+ Fields0 = [{<<"delivery_tag">>, signedint, DeliveryTag}, %% FIXME later
+ {<<"queue_msg_number">>, signedint, QMsgId},
+ {<<"redelivered">>, signedint, RedeliveredNum},
+ {<<"message">>, table, EncodedMessage}],
+ Fields = case ConsumerTagOrNone of
+ none ->
+ Fields0;
+ ConsumerTag ->
+ [{<<"consumer_tag">>, longstr, ConsumerTag}
+ | Fields0]
+ end,
+ maybe_inject(TraceExchangeBin, VHostBin, XNameBin,
+ <<"deliver">>, QNameBin, Fields)
+ end).
check_trace(VHostBin, F) ->
case catch case application:get_env(rabbit, {trace_exchange, VHostBin}) of
- undefined ->
- ok;
- {ok, TraceExchangeBin} ->
- F(TraceExchangeBin)
+ undefined -> ok;
+ {ok, TraceExchangeBin} -> F(TraceExchangeBin)
end of
- {'EXIT', Reason} ->
- info("Trace tap died with reason ~p~n", [Reason]);
- ok ->
- ok
+ {'EXIT', Reason} -> info("Trace tap died with reason ~p~n", [Reason]);
+ ok -> ok
end.
-maybe_inject(TraceExchangeBin,
- VHostBin,
- OriginalExchangeBin,
- RKPrefix,
- RKSuffix,
- Table) ->
+maybe_inject(TraceExchangeBin, VHostBin, OriginalExchangeBin,
+ RKPrefix, RKSuffix, Table) ->
if
TraceExchangeBin =:= OriginalExchangeBin ->
ok;
@@ -181,37 +172,39 @@ maybe_inject(TraceExchangeBin,
message_to_table(#basic_message{exchange_name = #resource{name = XName},
routing_key = RoutingKey,
content = Content}) ->
- #content{properties = Props,
- payload_fragments_rev = PFR} = rabbit_binary_parser:ensure_content_decoded(Content),
- #'P_basic'{content_type = ContentType,
- content_encoding = ContentEncoding,
- headers = Headers,
- delivery_mode = DeliveryMode,
- priority = Priority,
- correlation_id = CorrelationId,
- reply_to = ReplyTo,
- expiration = Expiration,
- message_id = MessageId,
- timestamp = Timestamp,
- type = Type,
- user_id = UserId,
- app_id = AppId} = Props,
+ #content{properties = #'P_basic'{content_type = ContentType,
+ content_encoding = ContentEncoding,
+ headers = Headers,
+ delivery_mode = DeliveryMode,
+ priority = Priority,
+ correlation_id = CorrelationId,
+ reply_to = ReplyTo,
+ expiration = Expiration,
+ message_id = MessageId,
+ timestamp = Timestamp,
+ type = Type,
+ user_id = UserId,
+ app_id = AppId},
+ payload_fragments_rev = PFR} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ Headers = prune_undefined(
+ [{<<"content_type">>, longstr, ContentType},
+ {<<"content_encoding">>, longstr, ContentEncoding},
+ {<<"headers">>, table, Headers},
+ {<<"delivery_mode">>, signedint, DeliveryMode},
+ {<<"priority">>, signedint, Priority},
+ {<<"correlation_id">>, longstr, CorrelationId},
+ {<<"reply_to">>, longstr, ReplyTo},
+ {<<"expiration">>, longstr, Expiration},
+ {<<"message_id">>, longstr, MessageId},
+ {<<"timestamp">>, longstr, Timestamp},
+ {<<"type">>, longstr, Type},
+ {<<"user_id">>, longstr, UserId},
+ {<<"app_id">>, longstr, AppId}]),
[{<<"exchange_name">>, longstr, XName},
- {<<"routing_key">>, longstr, RoutingKey},
- {<<"headers">>, table, prune_undefined([{<<"content_type">>, longstr, ContentType},
- {<<"content_encoding">>, longstr, ContentEncoding},
- {<<"headers">>, table, Headers},
- {<<"delivery_mode">>, signedint, DeliveryMode},
- {<<"priority">>, signedint, Priority},
- {<<"correlation_id">>, longstr, CorrelationId},
- {<<"reply_to">>, longstr, ReplyTo},
- {<<"expiration">>, longstr, Expiration},
- {<<"message_id">>, longstr, MessageId},
- {<<"timestamp">>, longstr, Timestamp},
- {<<"type">>, longstr, Type},
- {<<"user_id">>, longstr, UserId},
- {<<"app_id">>, longstr, AppId}])},
- {<<"body">>, longstr, list_to_binary(lists:reverse(PFR))}].
+ {<<"routing_key">>, longstr, RoutingKey},
+ {<<"headers">>, table, Headers},
+ {<<"body">>, longstr, list_to_binary(lists:reverse(PFR))}].
prune_undefined(Fields) ->
[F || F = {_, _, Value} <- Fields,