From 5cf16a3b62a7222d39142c4adb4189c9be943fcc Mon Sep 17 00:00:00 2001 From: Ulf Wiger Date: Tue, 12 Jan 2016 10:09:58 -0800 Subject: deconflict tried deconflicting itself --- components/authorize/src/authorize_keys.erl | 4 +++- components/dlink_tcp/src/connection.erl | 2 +- components/dlink_tcp/src/dlink_tcp_rpc.erl | 20 ++++++++++++++------ components/rvi_common/src/rvi_common.erl | 4 +++- 4 files changed, 21 insertions(+), 9 deletions(-) (limited to 'components') diff --git a/components/authorize/src/authorize_keys.erl b/components/authorize/src/authorize_keys.erl index a7bf84a..0abe0aa 100644 --- a/components/authorize/src/authorize_keys.erl +++ b/components/authorize/src/authorize_keys.erl @@ -245,7 +245,7 @@ code_change(_FromVsn, S, _Extra) -> creds_by_conn(Conn) -> ?debug("creds_by_conn(~p)~n", [Conn]), UTC = rvi_common:utc_timestamp(), - ?debug("all creds = ~p", [ets:tab2list(?CREDS)]), + ?debug("all creds = ~p", [abbrev(ets:tab2list(?CREDS))]), Creds = ets:select(?CREDS, [{ {{Conn,'_'}, #cred{jwt = '$1', validity = '$2', _='_'}}, @@ -683,6 +683,8 @@ abbrev_jwt({Hdr, Body} = X) -> abbrev_jwt(X) -> X. +abbrev_pl({K, #cred{} = C}) -> + {abbrev_pl(K), abbrev_pl(C)}; abbrev_pl(#cred{} = Payload) -> list_to_tuple(lists:map(fun(B) when is_binary(B) -> abbrev_bin(B); ([{_,_}|_]=L) -> abbrev_payload(L); diff --git a/components/dlink_tcp/src/connection.erl b/components/dlink_tcp/src/connection.erl index 1d4753e..c6a9531 100644 --- a/components/dlink_tcp/src/connection.erl +++ b/components/dlink_tcp/src/connection.erl @@ -266,7 +266,7 @@ handle_info({tcp, Sock, Data}, case dlink_data:decode(Data, fun(Elems) -> got_msg(Elems, State) end, DSt, ?MODULE, FragOpts) of - {ok, DSt1} = Ok -> + {ok, DSt1} -> inet:setopts(Sock, [{active, once}]), {noreply, State#st{decode_st = DSt1}}; {error, Reason} -> diff --git a/components/dlink_tcp/src/dlink_tcp_rpc.erl b/components/dlink_tcp/src/dlink_tcp_rpc.erl index 78de04a..9089c13 100644 --- a/components/dlink_tcp/src/dlink_tcp_rpc.erl +++ b/components/dlink_tcp/src/dlink_tcp_rpc.erl @@ -466,8 +466,10 @@ handle_cast({handle_socket, FromPid, IP, Port, Event, Arg}, St) -> ?debug("handle_socket, Arg (CS) = ~p", [Arg]), try handle_socket_(FromPid, IP, Port, Event, Arg) catch + exit:deconflict -> + {stop, St}; C:E -> - ?debug("Caught ~p:~p; ~p", [C, E, erlang:get_stacktrace()]), + ?error("Caught ~p:~p; ~p", [C, E, erlang:get_stacktrace()]), error("Caught ~p:~p", [C, E]), ok end, @@ -476,8 +478,10 @@ handle_cast({handle_socket, FromPid, IP, Port, Event, Arg}, St) -> handle_cast({handle_socket, FromPid, IP, Port, Event, Payload, Arg}, St) -> try handle_socket_(FromPid, IP, Port, Event, Payload, Arg) catch + exit:deconflict -> + {stop, St}; C:E -> - ?debug("Caught ~p:~p; ~p", [C, E, erlang:get_stacktrace()]), + ?error("Caught ~p:~p; ~p", [C, E, erlang:get_stacktrace()]), ok end, {noreply, St}; @@ -676,12 +680,16 @@ process_authorize(FromPid, PeerIP, PeerPort, RemoteAddress, RemotePort, ProtoVersion, Credentials, CompSpec) end, case connection_manager:find_connection_by_address(PeerIP, PeerPort) of - not_found -> F(); - BPid -> + not_found -> + F(); + {ok, FromPid} -> + F(); + {ok, BPid} -> deconflict_conns(FromPid, BPid, CompSpec, F) end. deconflict_conns(APid, BPid, CsA, F) -> + ?debug("deconflict_conns(~p, ~p, ...)", [APid, BPid]), {_, _} = ASrc = rvi_common:get_value(source_address, undefined, CsA), case connection:get_source_address(BPid) of undefined -> @@ -693,8 +701,8 @@ deconflict_conns(APid, BPid, CsA, F) -> exit(BPid, deconflict), F(); BSrc -> - ?debug("Deconflict - kill APid (~p - self): ASrc = ~p, BSrc = ~p", [APid, ASrc, BSrc]), - exit(deconflict) + ?debug("Deconflict - kill APid (~p): ASrc = ~p, BSrc = ~p", [APid, ASrc, BSrc]), + exit(APid, deconflict) end. diff --git a/components/rvi_common/src/rvi_common.erl b/components/rvi_common/src/rvi_common.erl index c227815..ca81b40 100644 --- a/components/rvi_common/src/rvi_common.erl +++ b/components/rvi_common/src/rvi_common.erl @@ -844,7 +844,7 @@ start_msgpack_rpc_client(Component, Module, Opts, XOpts) -> rvi_msgpack_rpc:start_link([{gproc, {n,l,Name}}|XOpts] ++ Opts). start_msgpack_rpc_server(Component, Module, Opts, XOpts) -> - Name = {msgpack_rpc_server, Component, Module}, + %% Name = {msgpack_rpc_server, Component, Module}, [Callback, Rest] = take([{callback, fun() -> msgpack_rpc_cb(Module) end}], XOpts ++ Opts), rvi_msgpack_rpc_server:start_link([{callback, Callback} | Rest]). @@ -1023,9 +1023,11 @@ take([], Opts) -> save_source_address(client, Socket, CS) -> {ok, {_, _} = Addr} = inet:peername(Socket), + ?debug("save_source_address (client: ~p): ~p", [self(), Addr]), set_value(source_address, Addr, CS); save_source_address(server, Socket, CS) -> {ok, {_, _} = Addr} = inet:sockname(Socket), + ?debug("save_source_address (server: ~p): ~p", [self(), Addr]), set_value(source_address, Addr, CS). get_source_address(CS) -> -- cgit v1.2.1