diff options
author | Tony Garnock-Jones <tonyg@lshift.net> | 2009-12-14 13:51:55 +0000 |
---|---|---|
committer | Tony Garnock-Jones <tonyg@lshift.net> | 2009-12-14 13:51:55 +0000 |
commit | b5651c40d78ee23ee2413157909dfdc0573d9408 (patch) | |
tree | 7bdf670419745b19c835325839e2677dcdb2fbd5 | |
parent | 8ab62dcf0065b009bb8e82b15dd0feace428b8b8 (diff) | |
parent | b6c4d71e117b13259707214c933fa27c6bbbd70f (diff) | |
download | rabbitmq-server-b5651c40d78ee23ee2413157909dfdc0573d9408.tar.gz |
Merge default into bug22039
-rw-r--r-- | docs/rabbitmqctl.1.pod | 11 | ||||
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_control.erl | 46 | ||||
-rw-r--r-- | src/rabbit_networking.erl | 48 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 50 |
5 files changed, 91 insertions, 66 deletions
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index 6b420872..5255be28 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -198,9 +198,9 @@ whether the queue will be deleted when no longer used queue arguments -=item node +=item pid -node on which the process associated with the queue resides +id of the Erlang process associated with the queue =item messages_ready @@ -297,7 +297,7 @@ I<user>, I<peer_address>, I<peer_port> and I<state> are assumed. =item node -node on which the process associated with the connection resides +id of the Erlang process associated with the connection =item address @@ -340,6 +340,11 @@ connection timeout maximum frame size (bytes) +=item client_properties + +informational properties transmitted by the client during connection +establishment + =item recv_oct octets received diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 5703d0d6..4b157cbc 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -36,7 +36,7 @@ -record(vhost, {virtual_host, dummy}). --record(connection, {user, timeout_sec, frame_max, vhost}). +-record(connection, {user, timeout_sec, frame_max, vhost, client_properties}). -record(content, {class_id, diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 19579729..ddd0c002 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -173,7 +173,7 @@ The list_queues, list_exchanges and list_bindings commands accept an optional virtual host parameter for which to display results. The default value is \"/\". <QueueInfoItem> must be a member of the list [name, durable, auto_delete, -arguments, node, messages_ready, messages_unacknowledged, messages_uncommitted, +arguments, pid, messages_ready, messages_unacknowledged, messages_uncommitted, messages, acks_uncommitted, consumers, transactions, memory]. The default is to display name and (number of) messages. @@ -183,10 +183,10 @@ auto_delete, arguments]. The default is to display name and type. The output format for \"list_bindings\" is a list of rows containing exchange name, queue name, routing key and arguments, in that order. -<ConnectionInfoItem> must be a member of the list [node, address, port, +<ConnectionInfoItem> must be a member of the list [pid, address, port, peer_address, peer_port, state, channels, user, vhost, timeout, frame_max, -recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display -user, peer_address, peer_port and state. +client_properties, recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. +The default is to display user, peer_address, peer_port and state. "), halt(1). @@ -268,8 +268,7 @@ action(list_user_permissions, Node, Args = [_Username], Inform) -> action(list_queues, Node, Args, Inform) -> Inform("Listing queues", []), {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args), - ArgAtoms = list_replace(node, pid, - default_if_empty(RemainingArgs, [name, messages])), + ArgAtoms = default_if_empty(RemainingArgs, [name, messages]), display_info_list(rpc_call(Node, rabbit_amqqueue, info_all, [VHostArg, ArgAtoms]), ArgAtoms); @@ -294,9 +293,7 @@ action(list_bindings, Node, Args, Inform) -> action(list_connections, Node, Args, Inform) -> Inform("Listing connections", []), - ArgAtoms = list_replace(node, pid, - default_if_empty(Args, [user, peer_address, - peer_port, state])), + ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), ArgAtoms); @@ -358,12 +355,15 @@ format_info_item(Key, Items) -> is_tuple(Value) -> inet_parse:ntoa(Value); Value when is_pid(Value) -> - atom_to_list(node(Value)); + pid_to_string(Value); Value when is_binary(Value) -> escape(Value); Value when is_atom(Value) -> - escape(atom_to_list(Value)); - Value -> + escape(atom_to_list(Value)); + Value = [{TableEntryKey, TableEntryType, _TableEntryValue} | _] + when is_binary(TableEntryKey) andalso is_atom(TableEntryType) -> + io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); + Value -> io_lib:format("~w", [Value]) end. @@ -388,14 +388,14 @@ rpc_call(Node, Mod, Fun, Args) -> %% characters. We don't escape characters above 127, since they may %% form part of UTF-8 strings. -escape(Bin) when binary(Bin) -> +escape(Bin) when is_binary(Bin) -> escape(binary_to_list(Bin)); escape(L) when is_list(L) -> escape_char(lists:reverse(L), []). escape_char([$\\ | T], Acc) -> escape_char(T, [$\\, $\\ | Acc]); -escape_char([X | T], Acc) when X > 32, X /= 127 -> +escape_char([X | T], Acc) when X >= 32, X /= 127 -> escape_char(T, [X | Acc]); escape_char([X | T], Acc) -> escape_char(T, [$\\, $0 + (X bsr 6), $0 + (X band 8#070 bsr 3), @@ -403,6 +403,20 @@ escape_char([X | T], Acc) -> escape_char([], Acc) -> Acc. -list_replace(Find, Replace, List) -> - [case X of Find -> Replace; _ -> X end || X <- List]. +prettify_amqp_table(Table) -> + [{escape(K), prettify_typed_amqp_value(T, V)} || {K, T, V} <- Table]. +prettify_typed_amqp_value(Type, Value) -> + case Type of + longstr -> escape(Value); + table -> prettify_amqp_table(Value); + array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value]; + _ -> Value + end. + +%% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and 8.7) +pid_to_string(Pid) -> + <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>> + = term_to_binary(Pid), + Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), + lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 1bc17a32..3a0f9240 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -53,6 +53,9 @@ %% {delay_send, true}, {exit_on_close, false} ]). + +-define(SSL_TIMEOUT, 5). %% seconds + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -160,36 +163,31 @@ node_listeners(Node) -> on_node_down(Node) -> ok = mnesia:dirty_delete(rabbit_listener, Node). -start_client(Sock) -> +start_client(Sock, SockTransform) -> {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), ok = rabbit_net:controlling_process(Sock, Child), - Child ! {go, Sock}, + Child ! {go, Sock, SockTransform}, Child. +start_client(Sock) -> + start_client(Sock, fun (S) -> {ok, S} end). + start_ssl_client(SslOpts, Sock) -> - case rabbit_net:peername(Sock) of - {ok, {PeerAddress, PeerPort}} -> - PeerIp = inet_parse:ntoa(PeerAddress), - case ssl:ssl_accept(Sock, SslOpts) of - {ok, SslSock} -> - rabbit_log:info("upgraded TCP connection " - "from ~s:~p to SSL~n", - [PeerIp, PeerPort]), - RabbitSslSock = #ssl_socket{tcp = Sock, ssl = SslSock}, - start_client(RabbitSslSock); - {error, Reason} -> - gen_tcp:close(Sock), - rabbit_log:error("failed to upgrade TCP connection " - "from ~s:~p to SSL: ~n~p~n", - [PeerIp, PeerPort, Reason]), - {error, Reason} - end; - {error, Reason} -> - gen_tcp:close(Sock), - rabbit_log:error("failed to upgrade TCP connection to SSL: ~p~n", - [Reason]), - {error, Reason} - end. + start_client( + Sock, + fun (Sock1) -> + case catch ssl:ssl_accept(Sock1, SslOpts, ?SSL_TIMEOUT * 1000) of + {ok, SslSock} -> + rabbit_log:info("upgraded TCP connection ~p to SSL~n", + [self()]), + {ok, #ssl_socket{tcp = Sock1, ssl = SslSock}}; + {error, Reason} -> + {error, {ssl_upgrade_error, Reason}}; + {'EXIT', Reason} -> + {error, {ssl_upgrade_failure, Reason}} + + end + end). connections() -> [Pid || {_, Pid, _, _} <- supervisor:which_children( diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e21485b5..e78d889d 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -58,7 +58,7 @@ -define(INFO_KEYS, [pid, address, port, peer_address, peer_port, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, - state, channels, user, vhost, timeout, frame_max]). + state, channels, user, vhost, timeout, frame_max, client_properties]). %% connection lifecycle %% @@ -142,7 +142,8 @@ start_link() -> init(Parent) -> Deb = sys:debug_options([]), receive - {go, Sock} -> start_connection(Parent, Deb, Sock) + {go, Sock, SockTransform} -> + start_connection(Parent, Deb, Sock, SockTransform) end. system_continue(Parent, Deb, State) -> @@ -192,34 +193,35 @@ teardown_profiling(Value) -> inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). -peername(Sock) -> - try - {Address, Port} = inet_op(fun () -> rabbit_net:peername(Sock) end), - AddressS = inet_parse:ntoa(Address), - {AddressS, Port} - catch - Ex -> rabbit_log:error("error on TCP connection ~p:~p~n", - [self(), Ex]), - rabbit_log:info("closing TCP connection ~p", [self()]), - exit(normal) +socket_op(Sock, Fun) -> + case Fun(Sock) of + {ok, Res} -> Res; + {error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n", + [self(), Reason]), + rabbit_log:info("closing TCP connection ~p~n", + [self()]), + exit(normal) end. -start_connection(Parent, Deb, ClientSock) -> +start_connection(Parent, Deb, Sock, SockTransform) -> process_flag(trap_exit, true), - {PeerAddressS, PeerPort} = peername(ClientSock), + {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1), + PeerAddressS = inet_parse:ntoa(PeerAddress), + rabbit_log:info("starting TCP connection ~p from ~s:~p~n", + [self(), PeerAddressS, PeerPort]), + ClientSock = socket_op(Sock, SockTransform), + erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), + handshake_timeout), ProfilingValue = setup_profiling(), try - rabbit_log:info("starting TCP connection ~p from ~s:~p~n", - [self(), PeerAddressS, PeerPort]), - erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), - handshake_timeout), mainloop(Parent, Deb, switch_callback( #v1{sock = ClientSock, connection = #connection{ user = none, timeout_sec = ?HANDSHAKE_TIMEOUT, frame_max = ?FRAME_MIN_SIZE, - vhost = none}, + vhost = none, + client_properties = none}, callback = uninitialized_callback, recv_ref = none, connection_state = pre_init}, @@ -558,7 +560,8 @@ handle_method0(MethodName, FieldsBin, State) -> end. handle_method0(#'connection.start_ok'{mechanism = Mechanism, - response = Response}, + response = Response, + client_properties = ClientProperties}, State = #v1{connection_state = starting, connection = Connection, sock = Sock}) -> @@ -570,7 +573,9 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, frame_max = 131072, heartbeat = 0}), State#v1{connection_state = tuning, - connection = Connection#connection{user = User}}; + connection = Connection#connection{ + user = User, + client_properties = ClientProperties}}; handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax, frame_max = FrameMax, heartbeat = ClientHeartbeat}, @@ -689,6 +694,9 @@ i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) -> Timeout; i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) -> FrameMax; +i(client_properties, #v1{connection = #connection{ + client_properties = ClientProperties}}) -> + ClientProperties; i(Item, #v1{}) -> throw({bad_argument, Item}). |