diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-05-09 23:26:42 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-05-09 23:26:42 +0100 |
commit | 18443fcb146e270af9116bb6deff56068746c384 (patch) | |
tree | 3e437641b9074f1a61beba9e77f70e28bb980dc0 | |
parent | 09450cdccb23baaf936a37c045b9875a35fb3fc8 (diff) | |
download | rabbitmq-server-18443fcb146e270af9116bb6deff56068746c384.tar.gz |
cosmetic
-rw-r--r-- | src/rabbit_log.erl | 159 |
1 files changed, 76 insertions, 83 deletions
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index 93c01400..87cda552 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -100,70 +100,61 @@ error(Fmt) -> error(Fmt, Args) when is_list(Args) -> gen_server:cast(?SERVER, {error, Fmt, Args}). -tap_trace_in(Message = #basic_message{exchange_name = #resource{virtual_host = VHostBin, - name = XNameBin}}, +tap_trace_in(Message = #basic_message{exchange_name = #resource{ + virtual_host = VHostBin, + name = XNameBin}}, QPids) -> - check_trace(VHostBin, - fun (TraceExchangeBin) -> - QInfos = [rabbit_amqqueue:info(#amqqueue{pid = P}, [name]) || P <- QPids], - QNames = [N || [{name, #resource{name = N}}] <- QInfos], - maybe_inject(TraceExchangeBin, - VHostBin, - XNameBin, - <<"publish">>, - XNameBin, - [{<<"queue_names">>, - longstr, - list_to_binary(rabbit_misc:intersperse(",", QNames))}, - {<<"message">>, - table, - message_to_table(Message)}]) - end). + check_trace( + VHostBin, + fun (TraceExchangeBin) -> + QInfos = [rabbit_amqqueue:info(#amqqueue{pid = P}, [name]) || + P <- QPids], + QNames = [N || [{name, #resource{name = N}}] <- QInfos], + QNamesStr = list_to_binary(rabbit_misc:intersperse(",", QNames)), + EncodedMessage = message_to_table(Message), + maybe_inject(TraceExchangeBin, VHostBin, XNameBin, + <<"publish">>, XNameBin, + [{<<"queue_names">>, longstr, QNamesStr}, + {<<"message">>, table, EncodedMessage}]) + end). tap_trace_out({#resource{name = QNameBin}, _QPid, QMsgId, Redelivered, - Message = #basic_message{exchange_name = #resource{virtual_host = VHostBin, - name = XNameBin}}}, + Message = #basic_message{exchange_name = #resource{ + virtual_host = VHostBin, + name = XNameBin}}}, DeliveryTag, ConsumerTagOrNone) -> - check_trace(VHostBin, - fun (TraceExchangeBin) -> - RedeliveredNum = case Redelivered of true -> 1; false -> 0 end, - Fields0 = [{<<"delivery_tag">>, signedint, DeliveryTag}, %% FIXME later - {<<"queue_msg_number">>, signedint, QMsgId}, - {<<"redelivered">>, signedint, RedeliveredNum}, - {<<"message">>, table, message_to_table(Message)}], - Fields = case ConsumerTagOrNone of - none -> Fields0; - ConsumerTag -> [{<<"consumer_tag">>, longstr, ConsumerTag} - | Fields0] - end, - maybe_inject(TraceExchangeBin, - VHostBin, - XNameBin, - <<"deliver">>, - QNameBin, - Fields) - end). + check_trace( + VHostBin, + fun (TraceExchangeBin) -> + RedeliveredNum = case Redelivered of true -> 1; false -> 0 end, + EncodedMessage = message_to_table(Message), + Fields0 = [{<<"delivery_tag">>, signedint, DeliveryTag}, %% FIXME later + {<<"queue_msg_number">>, signedint, QMsgId}, + {<<"redelivered">>, signedint, RedeliveredNum}, + {<<"message">>, table, EncodedMessage}], + Fields = case ConsumerTagOrNone of + none -> + Fields0; + ConsumerTag -> + [{<<"consumer_tag">>, longstr, ConsumerTag} + | Fields0] + end, + maybe_inject(TraceExchangeBin, VHostBin, XNameBin, + <<"deliver">>, QNameBin, Fields) + end). check_trace(VHostBin, F) -> case catch case application:get_env(rabbit, {trace_exchange, VHostBin}) of - undefined -> - ok; - {ok, TraceExchangeBin} -> - F(TraceExchangeBin) + undefined -> ok; + {ok, TraceExchangeBin} -> F(TraceExchangeBin) end of - {'EXIT', Reason} -> - info("Trace tap died with reason ~p~n", [Reason]); - ok -> - ok + {'EXIT', Reason} -> info("Trace tap died with reason ~p~n", [Reason]); + ok -> ok end. -maybe_inject(TraceExchangeBin, - VHostBin, - OriginalExchangeBin, - RKPrefix, - RKSuffix, - Table) -> +maybe_inject(TraceExchangeBin, VHostBin, OriginalExchangeBin, + RKPrefix, RKSuffix, Table) -> if TraceExchangeBin =:= OriginalExchangeBin -> ok; @@ -181,37 +172,39 @@ maybe_inject(TraceExchangeBin, message_to_table(#basic_message{exchange_name = #resource{name = XName}, routing_key = RoutingKey, content = Content}) -> - #content{properties = Props, - payload_fragments_rev = PFR} = rabbit_binary_parser:ensure_content_decoded(Content), - #'P_basic'{content_type = ContentType, - content_encoding = ContentEncoding, - headers = Headers, - delivery_mode = DeliveryMode, - priority = Priority, - correlation_id = CorrelationId, - reply_to = ReplyTo, - expiration = Expiration, - message_id = MessageId, - timestamp = Timestamp, - type = Type, - user_id = UserId, - app_id = AppId} = Props, + #content{properties = #'P_basic'{content_type = ContentType, + content_encoding = ContentEncoding, + headers = Headers, + delivery_mode = DeliveryMode, + priority = Priority, + correlation_id = CorrelationId, + reply_to = ReplyTo, + expiration = Expiration, + message_id = MessageId, + timestamp = Timestamp, + type = Type, + user_id = UserId, + app_id = AppId}, + payload_fragments_rev = PFR} = + rabbit_binary_parser:ensure_content_decoded(Content), + Headers = prune_undefined( + [{<<"content_type">>, longstr, ContentType}, + {<<"content_encoding">>, longstr, ContentEncoding}, + {<<"headers">>, table, Headers}, + {<<"delivery_mode">>, signedint, DeliveryMode}, + {<<"priority">>, signedint, Priority}, + {<<"correlation_id">>, longstr, CorrelationId}, + {<<"reply_to">>, longstr, ReplyTo}, + {<<"expiration">>, longstr, Expiration}, + {<<"message_id">>, longstr, MessageId}, + {<<"timestamp">>, longstr, Timestamp}, + {<<"type">>, longstr, Type}, + {<<"user_id">>, longstr, UserId}, + {<<"app_id">>, longstr, AppId}]), [{<<"exchange_name">>, longstr, XName}, - {<<"routing_key">>, longstr, RoutingKey}, - {<<"headers">>, table, prune_undefined([{<<"content_type">>, longstr, ContentType}, - {<<"content_encoding">>, longstr, ContentEncoding}, - {<<"headers">>, table, Headers}, - {<<"delivery_mode">>, signedint, DeliveryMode}, - {<<"priority">>, signedint, Priority}, - {<<"correlation_id">>, longstr, CorrelationId}, - {<<"reply_to">>, longstr, ReplyTo}, - {<<"expiration">>, longstr, Expiration}, - {<<"message_id">>, longstr, MessageId}, - {<<"timestamp">>, longstr, Timestamp}, - {<<"type">>, longstr, Type}, - {<<"user_id">>, longstr, UserId}, - {<<"app_id">>, longstr, AppId}])}, - {<<"body">>, longstr, list_to_binary(lists:reverse(PFR))}]. + {<<"routing_key">>, longstr, RoutingKey}, + {<<"headers">>, table, Headers}, + {<<"body">>, longstr, list_to_binary(lists:reverse(PFR))}]. prune_undefined(Fields) -> [F || F = {_, _, Value} <- Fields, |