summaryrefslogtreecommitdiff
path: root/deps/amqp_client/src/amqp_direct_connection.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/amqp_client/src/amqp_direct_connection.erl')
-rw-r--r--deps/amqp_client/src/amqp_direct_connection.erl232
1 files changed, 232 insertions, 0 deletions
diff --git a/deps/amqp_client/src/amqp_direct_connection.erl b/deps/amqp_client/src/amqp_direct_connection.erl
new file mode 100644
index 0000000000..a07c67074e
--- /dev/null
+++ b/deps/amqp_client/src/amqp_direct_connection.erl
@@ -0,0 +1,232 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @private
+-module(amqp_direct_connection).
+
+-include("amqp_client_internal.hrl").
+
+-behaviour(amqp_gen_connection).
+
+-export([server_close/3]).
+
+-export([init/0, terminate/2, connect/4, do/2, open_channel_args/1, i/2,
+ info_keys/0, handle_message/2, closing/3, channels_terminated/1]).
+
+-export([socket_adapter_info/2]).
+
+-record(state, {node,
+ user,
+ vhost,
+ params,
+ adapter_info,
+ collector,
+ closing_reason, %% undefined | Reason
+ connected_at
+ }).
+
+-define(INFO_KEYS, [type]).
+
+-define(CREATION_EVENT_KEYS, [pid, protocol, host, port, name,
+ peer_host, peer_port,
+ user, vhost, client_properties, type,
+ connected_at, node, user_who_performed_action]).
+
+%%---------------------------------------------------------------------------
+
+%% amqp_connection:close() logically closes from the client end. We may
+%% want to close from the server end.
+server_close(ConnectionPid, Code, Text) ->
+ Close = #'connection.close'{reply_text = Text,
+ reply_code = Code,
+ class_id = 0,
+ method_id = 0},
+ amqp_gen_connection:server_close(ConnectionPid, Close).
+
+init() ->
+ {ok, #state{}}.
+
+open_channel_args(#state{node = Node,
+ user = User,
+ vhost = VHost,
+ collector = Collector,
+ params = Params}) ->
+ [self(), Node, User, VHost, Collector, Params].
+
+do(_Method, _State) ->
+ ok.
+
+handle_message({force_event_refresh, Ref}, State = #state{node = Node}) ->
+ rpc:call(Node, rabbit_event, notify,
+ [connection_created, connection_info(State), Ref]),
+ {ok, State};
+handle_message(closing_timeout, State = #state{closing_reason = Reason}) ->
+ {stop, {closing_timeout, Reason}, State};
+handle_message({'DOWN', _MRef, process, _ConnSup, shutdown}, State) ->
+ {stop, {shutdown, node_down}, State};
+handle_message({'DOWN', _MRef, process, _ConnSup, Reason}, State) ->
+ {stop, {remote_node_down, Reason}, State};
+handle_message({'EXIT', Pid, Reason}, State) ->
+ {stop, rabbit_misc:format("stopping because dependent process ~p died: ~p", [Pid, Reason]), State};
+handle_message(Msg, State) ->
+ {stop, {unexpected_msg, Msg}, State}.
+
+closing(_ChannelCloseType, Reason, State) ->
+ {ok, State#state{closing_reason = Reason}}.
+
+channels_terminated(State = #state{closing_reason = Reason,
+ collector = Collector}) ->
+ rabbit_queue_collector:delete_all(Collector),
+ {stop, {shutdown, Reason}, State}.
+
+terminate(_Reason, #state{node = Node} = State) ->
+ rpc:call(Node, rabbit_direct, disconnect,
+ [self(), [{pid, self()},
+ {node, Node},
+ {name, i(name, State)}]]),
+ ok.
+
+i(type, _State) -> direct;
+i(pid, _State) -> self();
+
+%% Mandatory connection parameters
+
+i(node, #state{node = N}) -> N;
+i(user, #state{params = P}) -> P#amqp_params_direct.username;
+i(user_who_performed_action, St) -> i(user, St);
+i(vhost, #state{params = P}) -> P#amqp_params_direct.virtual_host;
+i(client_properties, #state{params = P}) ->
+ P#amqp_params_direct.client_properties;
+i(connected_at, #state{connected_at = T}) -> T;
+
+%%
+%% Optional adapter info
+%%
+
+%% adapter_info can be undefined e.g. when we were
+%% not granted access to a vhost
+i(_Key, #state{adapter_info = undefined}) -> unknown;
+i(protocol, #state{adapter_info = I}) -> I#amqp_adapter_info.protocol;
+i(host, #state{adapter_info = I}) -> I#amqp_adapter_info.host;
+i(port, #state{adapter_info = I}) -> I#amqp_adapter_info.port;
+i(peer_host, #state{adapter_info = I}) -> I#amqp_adapter_info.peer_host;
+i(peer_port, #state{adapter_info = I}) -> I#amqp_adapter_info.peer_port;
+i(name, #state{adapter_info = I}) -> I#amqp_adapter_info.name;
+i(internal_user, #state{user = U}) -> U;
+i(Item, _State) -> throw({bad_argument, Item}).
+
+info_keys() ->
+ ?INFO_KEYS.
+
+infos(Items, State) ->
+ [{Item, i(Item, State)} || Item <- Items].
+
+connection_info(State = #state{adapter_info = I}) ->
+ infos(?CREATION_EVENT_KEYS, State) ++ I#amqp_adapter_info.additional_info.
+
+connect(Params = #amqp_params_direct{username = Username,
+ password = Password,
+ node = Node,
+ adapter_info = Info,
+ virtual_host = VHost},
+ SIF, _TypeSup, State) ->
+ State1 = State#state{node = Node,
+ vhost = VHost,
+ params = Params,
+ adapter_info = ensure_adapter_info(Info),
+ connected_at =
+ os:system_time(milli_seconds)},
+ DecryptedPassword = credentials_obfuscation:decrypt(Password),
+ case rpc:call(Node, rabbit_direct, connect,
+ [{Username, DecryptedPassword}, VHost, ?PROTOCOL, self(),
+ connection_info(State1)]) of
+ {ok, {User, ServerProperties}} ->
+ {ok, ChMgr, Collector} = SIF(i(name, State1)),
+ State2 = State1#state{user = User,
+ collector = Collector},
+ %% There's no real connection-level process on the remote
+ %% node for us to monitor or link to, but we want to
+ %% detect connection death if the remote node goes down
+ %% when there are no channels. So we monitor the
+ %% supervisor; that way we find out if the node goes down
+ %% or the rabbit app stops.
+ erlang:monitor(process, {rabbit_direct_client_sup, Node}),
+ {ok, {ServerProperties, 0, ChMgr, State2}};
+ {error, _} = E ->
+ E;
+ {badrpc, nodedown} ->
+ {error, {nodedown, Node}}
+ end.
+
+ensure_adapter_info(none) ->
+ ensure_adapter_info(#amqp_adapter_info{});
+
+ensure_adapter_info(A = #amqp_adapter_info{protocol = unknown}) ->
+ ensure_adapter_info(A#amqp_adapter_info{
+ protocol = {'Direct', ?PROTOCOL:version()}});
+
+ensure_adapter_info(A = #amqp_adapter_info{name = unknown}) ->
+ Name = list_to_binary(rabbit_misc:pid_to_string(self())),
+ ensure_adapter_info(A#amqp_adapter_info{name = Name});
+
+ensure_adapter_info(Info) -> Info.
+
+socket_adapter_info(Sock, Protocol) ->
+ {PeerHost, PeerPort, Host, Port} =
+ case rabbit_net:socket_ends(Sock, inbound) of
+ {ok, Res} -> Res;
+ _ -> {unknown, unknown, unknown, unknown}
+ end,
+ Name = case rabbit_net:connection_string(Sock, inbound) of
+ {ok, Res1} -> Res1;
+ _Error -> "(unknown)"
+ end,
+ #amqp_adapter_info{protocol = Protocol,
+ name = list_to_binary(Name),
+ host = Host,
+ port = Port,
+ peer_host = PeerHost,
+ peer_port = PeerPort,
+ additional_info = maybe_ssl_info(Sock)}.
+
+maybe_ssl_info(Sock) ->
+ RealSocket = rabbit_net:unwrap_socket(Sock),
+ case rabbit_net:is_ssl(RealSocket) of
+ true -> [{ssl, true}] ++ ssl_info(RealSocket) ++ ssl_cert_info(RealSocket);
+ false -> [{ssl, false}]
+ end.
+
+ssl_info(Sock) ->
+ {Protocol, KeyExchange, Cipher, Hash} =
+ case rabbit_net:ssl_info(Sock) of
+ {ok, Infos} ->
+ {_, P} = lists:keyfind(protocol, 1, Infos),
+ #{cipher := C,
+ key_exchange := K,
+ mac := H} = proplists:get_value(
+ selected_cipher_suite, Infos),
+ {P, K, C, H};
+ _ ->
+ {unknown, unknown, unknown, unknown}
+ end,
+ [{ssl_protocol, Protocol},
+ {ssl_key_exchange, KeyExchange},
+ {ssl_cipher, Cipher},
+ {ssl_hash, Hash}].
+
+ssl_cert_info(Sock) ->
+ case rabbit_net:peercert(Sock) of
+ {ok, Cert} ->
+ [{peer_cert_issuer, list_to_binary(
+ rabbit_cert_info:issuer(Cert))},
+ {peer_cert_subject, list_to_binary(
+ rabbit_cert_info:subject(Cert))},
+ {peer_cert_validity, list_to_binary(
+ rabbit_cert_info:validity(Cert))}];
+ _ ->
+ []
+ end.