summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@lshift.net>2009-12-14 13:51:55 +0000
committerTony Garnock-Jones <tonyg@lshift.net>2009-12-14 13:51:55 +0000
commitb5651c40d78ee23ee2413157909dfdc0573d9408 (patch)
tree7bdf670419745b19c835325839e2677dcdb2fbd5
parent8ab62dcf0065b009bb8e82b15dd0feace428b8b8 (diff)
parentb6c4d71e117b13259707214c933fa27c6bbbd70f (diff)
downloadrabbitmq-server-b5651c40d78ee23ee2413157909dfdc0573d9408.tar.gz
Merge default into bug22039
-rw-r--r--docs/rabbitmqctl.1.pod11
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_control.erl46
-rw-r--r--src/rabbit_networking.erl48
-rw-r--r--src/rabbit_reader.erl50
5 files changed, 91 insertions, 66 deletions
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod
index 6b420872..5255be28 100644
--- a/docs/rabbitmqctl.1.pod
+++ b/docs/rabbitmqctl.1.pod
@@ -198,9 +198,9 @@ whether the queue will be deleted when no longer used
queue arguments
-=item node
+=item pid
-node on which the process associated with the queue resides
+id of the Erlang process associated with the queue
=item messages_ready
@@ -297,7 +297,7 @@ I<user>, I<peer_address>, I<peer_port> and I<state> are assumed.
=item node
-node on which the process associated with the connection resides
+id of the Erlang process associated with the connection
=item address
@@ -340,6 +340,11 @@ connection timeout
maximum frame size (bytes)
+=item client_properties
+
+informational properties transmitted by the client during connection
+establishment
+
=item recv_oct
octets received
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 5703d0d6..4b157cbc 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -36,7 +36,7 @@
-record(vhost, {virtual_host, dummy}).
--record(connection, {user, timeout_sec, frame_max, vhost}).
+-record(connection, {user, timeout_sec, frame_max, vhost, client_properties}).
-record(content,
{class_id,
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 19579729..ddd0c002 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -173,7 +173,7 @@ The list_queues, list_exchanges and list_bindings commands accept an optional
virtual host parameter for which to display results. The default value is \"/\".
<QueueInfoItem> must be a member of the list [name, durable, auto_delete,
-arguments, node, messages_ready, messages_unacknowledged, messages_uncommitted,
+arguments, pid, messages_ready, messages_unacknowledged, messages_uncommitted,
messages, acks_uncommitted, consumers, transactions, memory]. The default is
to display name and (number of) messages.
@@ -183,10 +183,10 @@ auto_delete, arguments]. The default is to display name and type.
The output format for \"list_bindings\" is a list of rows containing
exchange name, queue name, routing key and arguments, in that order.
-<ConnectionInfoItem> must be a member of the list [node, address, port,
+<ConnectionInfoItem> must be a member of the list [pid, address, port,
peer_address, peer_port, state, channels, user, vhost, timeout, frame_max,
-recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display
-user, peer_address, peer_port and state.
+client_properties, recv_oct, recv_cnt, send_oct, send_cnt, send_pend].
+The default is to display user, peer_address, peer_port and state.
"),
halt(1).
@@ -268,8 +268,7 @@ action(list_user_permissions, Node, Args = [_Username], Inform) ->
action(list_queues, Node, Args, Inform) ->
Inform("Listing queues", []),
{VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args),
- ArgAtoms = list_replace(node, pid,
- default_if_empty(RemainingArgs, [name, messages])),
+ ArgAtoms = default_if_empty(RemainingArgs, [name, messages]),
display_info_list(rpc_call(Node, rabbit_amqqueue, info_all,
[VHostArg, ArgAtoms]),
ArgAtoms);
@@ -294,9 +293,7 @@ action(list_bindings, Node, Args, Inform) ->
action(list_connections, Node, Args, Inform) ->
Inform("Listing connections", []),
- ArgAtoms = list_replace(node, pid,
- default_if_empty(Args, [user, peer_address,
- peer_port, state])),
+ ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port, state]),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
ArgAtoms);
@@ -358,12 +355,15 @@ format_info_item(Key, Items) ->
is_tuple(Value) ->
inet_parse:ntoa(Value);
Value when is_pid(Value) ->
- atom_to_list(node(Value));
+ pid_to_string(Value);
Value when is_binary(Value) ->
escape(Value);
Value when is_atom(Value) ->
- escape(atom_to_list(Value));
- Value ->
+ escape(atom_to_list(Value));
+ Value = [{TableEntryKey, TableEntryType, _TableEntryValue} | _]
+ when is_binary(TableEntryKey) andalso is_atom(TableEntryType) ->
+ io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]);
+ Value ->
io_lib:format("~w", [Value])
end.
@@ -388,14 +388,14 @@ rpc_call(Node, Mod, Fun, Args) ->
%% characters. We don't escape characters above 127, since they may
%% form part of UTF-8 strings.
-escape(Bin) when binary(Bin) ->
+escape(Bin) when is_binary(Bin) ->
escape(binary_to_list(Bin));
escape(L) when is_list(L) ->
escape_char(lists:reverse(L), []).
escape_char([$\\ | T], Acc) ->
escape_char(T, [$\\, $\\ | Acc]);
-escape_char([X | T], Acc) when X > 32, X /= 127 ->
+escape_char([X | T], Acc) when X >= 32, X /= 127 ->
escape_char(T, [X | Acc]);
escape_char([X | T], Acc) ->
escape_char(T, [$\\, $0 + (X bsr 6), $0 + (X band 8#070 bsr 3),
@@ -403,6 +403,20 @@ escape_char([X | T], Acc) ->
escape_char([], Acc) ->
Acc.
-list_replace(Find, Replace, List) ->
- [case X of Find -> Replace; _ -> X end || X <- List].
+prettify_amqp_table(Table) ->
+ [{escape(K), prettify_typed_amqp_value(T, V)} || {K, T, V} <- Table].
+prettify_typed_amqp_value(Type, Value) ->
+ case Type of
+ longstr -> escape(Value);
+ table -> prettify_amqp_table(Value);
+ array -> [prettify_typed_amqp_value(T, V) || {T, V} <- Value];
+ _ -> Value
+ end.
+
+%% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and 8.7)
+pid_to_string(Pid) ->
+ <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>>
+ = term_to_binary(Pid),
+ Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
+ lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 1bc17a32..3a0f9240 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -53,6 +53,9 @@
%% {delay_send, true},
{exit_on_close, false}
]).
+
+-define(SSL_TIMEOUT, 5). %% seconds
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -160,36 +163,31 @@ node_listeners(Node) ->
on_node_down(Node) ->
ok = mnesia:dirty_delete(rabbit_listener, Node).
-start_client(Sock) ->
+start_client(Sock, SockTransform) ->
{ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
ok = rabbit_net:controlling_process(Sock, Child),
- Child ! {go, Sock},
+ Child ! {go, Sock, SockTransform},
Child.
+start_client(Sock) ->
+ start_client(Sock, fun (S) -> {ok, S} end).
+
start_ssl_client(SslOpts, Sock) ->
- case rabbit_net:peername(Sock) of
- {ok, {PeerAddress, PeerPort}} ->
- PeerIp = inet_parse:ntoa(PeerAddress),
- case ssl:ssl_accept(Sock, SslOpts) of
- {ok, SslSock} ->
- rabbit_log:info("upgraded TCP connection "
- "from ~s:~p to SSL~n",
- [PeerIp, PeerPort]),
- RabbitSslSock = #ssl_socket{tcp = Sock, ssl = SslSock},
- start_client(RabbitSslSock);
- {error, Reason} ->
- gen_tcp:close(Sock),
- rabbit_log:error("failed to upgrade TCP connection "
- "from ~s:~p to SSL: ~n~p~n",
- [PeerIp, PeerPort, Reason]),
- {error, Reason}
- end;
- {error, Reason} ->
- gen_tcp:close(Sock),
- rabbit_log:error("failed to upgrade TCP connection to SSL: ~p~n",
- [Reason]),
- {error, Reason}
- end.
+ start_client(
+ Sock,
+ fun (Sock1) ->
+ case catch ssl:ssl_accept(Sock1, SslOpts, ?SSL_TIMEOUT * 1000) of
+ {ok, SslSock} ->
+ rabbit_log:info("upgraded TCP connection ~p to SSL~n",
+ [self()]),
+ {ok, #ssl_socket{tcp = Sock1, ssl = SslSock}};
+ {error, Reason} ->
+ {error, {ssl_upgrade_error, Reason}};
+ {'EXIT', Reason} ->
+ {error, {ssl_upgrade_failure, Reason}}
+
+ end
+ end).
connections() ->
[Pid || {_, Pid, _, _} <- supervisor:which_children(
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e21485b5..e78d889d 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -58,7 +58,7 @@
-define(INFO_KEYS,
[pid, address, port, peer_address, peer_port,
recv_oct, recv_cnt, send_oct, send_cnt, send_pend,
- state, channels, user, vhost, timeout, frame_max]).
+ state, channels, user, vhost, timeout, frame_max, client_properties]).
%% connection lifecycle
%%
@@ -142,7 +142,8 @@ start_link() ->
init(Parent) ->
Deb = sys:debug_options([]),
receive
- {go, Sock} -> start_connection(Parent, Deb, Sock)
+ {go, Sock, SockTransform} ->
+ start_connection(Parent, Deb, Sock, SockTransform)
end.
system_continue(Parent, Deb, State) ->
@@ -192,34 +193,35 @@ teardown_profiling(Value) ->
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
-peername(Sock) ->
- try
- {Address, Port} = inet_op(fun () -> rabbit_net:peername(Sock) end),
- AddressS = inet_parse:ntoa(Address),
- {AddressS, Port}
- catch
- Ex -> rabbit_log:error("error on TCP connection ~p:~p~n",
- [self(), Ex]),
- rabbit_log:info("closing TCP connection ~p", [self()]),
- exit(normal)
+socket_op(Sock, Fun) ->
+ case Fun(Sock) of
+ {ok, Res} -> Res;
+ {error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n",
+ [self(), Reason]),
+ rabbit_log:info("closing TCP connection ~p~n",
+ [self()]),
+ exit(normal)
end.
-start_connection(Parent, Deb, ClientSock) ->
+start_connection(Parent, Deb, Sock, SockTransform) ->
process_flag(trap_exit, true),
- {PeerAddressS, PeerPort} = peername(ClientSock),
+ {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1),
+ PeerAddressS = inet_parse:ntoa(PeerAddress),
+ rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
+ [self(), PeerAddressS, PeerPort]),
+ ClientSock = socket_op(Sock, SockTransform),
+ erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
+ handshake_timeout),
ProfilingValue = setup_profiling(),
try
- rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
- [self(), PeerAddressS, PeerPort]),
- erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
- handshake_timeout),
mainloop(Parent, Deb, switch_callback(
#v1{sock = ClientSock,
connection = #connection{
user = none,
timeout_sec = ?HANDSHAKE_TIMEOUT,
frame_max = ?FRAME_MIN_SIZE,
- vhost = none},
+ vhost = none,
+ client_properties = none},
callback = uninitialized_callback,
recv_ref = none,
connection_state = pre_init},
@@ -558,7 +560,8 @@ handle_method0(MethodName, FieldsBin, State) ->
end.
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
- response = Response},
+ response = Response,
+ client_properties = ClientProperties},
State = #v1{connection_state = starting,
connection = Connection,
sock = Sock}) ->
@@ -570,7 +573,9 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
frame_max = 131072,
heartbeat = 0}),
State#v1{connection_state = tuning,
- connection = Connection#connection{user = User}};
+ connection = Connection#connection{
+ user = User,
+ client_properties = ClientProperties}};
handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax,
frame_max = FrameMax,
heartbeat = ClientHeartbeat},
@@ -689,6 +694,9 @@ i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
Timeout;
i(frame_max, #v1{connection = #connection{frame_max = FrameMax}}) ->
FrameMax;
+i(client_properties, #v1{connection = #connection{
+ client_properties = ClientProperties}}) ->
+ ClientProperties;
i(Item, #v1{}) ->
throw({bad_argument, Item}).