diff options
author | kjnilsson <knilsson@pivotal.io> | 2020-07-23 15:01:22 +0100 |
---|---|---|
committer | kjnilsson <knilsson@pivotal.io> | 2020-07-24 15:38:50 +0100 |
commit | 4284f205bddfe18f4dbfc050b07f97ec00387954 (patch) | |
tree | 9fced68eb1ea3865a428d0fada8d2740c0c2448c | |
parent | 37f6a35ddecbb88b7dc4538ff2a57d2cba0836a1 (diff) | |
download | rabbitmq-server-git-4284f205bddfe18f4dbfc050b07f97ec00387954.tar.gz |
Switch remaining processes to gen_statem
-rw-r--r-- | deps/amqp10_client/src/amqp10_client.erl | 7 | ||||
-rw-r--r-- | deps/amqp10_client/src/amqp10_client.hrl | 1 | ||||
-rw-r--r-- | deps/amqp10_client/src/amqp10_client_connection.erl | 166 | ||||
-rw-r--r-- | deps/amqp10_client/src/amqp10_client_frame_reader.erl | 6 | ||||
-rw-r--r-- | deps/amqp10_client/src/amqp10_client_session.erl | 210 | ||||
-rw-r--r-- | deps/amqp10_client/test/system_SUITE.erl | 22 |
6 files changed, 207 insertions, 205 deletions
diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl index 50cea18806..4420cbd2b3 100644 --- a/deps/amqp10_client/src/amqp10_client.erl +++ b/deps/amqp10_client/src/amqp10_client.erl @@ -10,11 +10,6 @@ -include("amqp10_client.hrl"). -include_lib("amqp10_common/include/amqp10_framing.hrl"). --ifdef(nowarn_deprecated_gen_fsm). --compile({nowarn_deprecated_function, - [{gen_fsm, send_event, 2}]}). --endif. - -export([open_connection/1, open_connection/2, close_connection/1, @@ -158,7 +153,7 @@ begin_session_sync(Connection, Timeout) when is_pid(Connection) -> %% {amqp10_event, {session, SessionPid, {ended, Why}}} -spec end_session(pid()) -> ok. end_session(Pid) -> - gen_fsm:send_event(Pid, 'end'). + amqp10_client_session:'end'(Pid). %% @doc Synchronously attach a link on 'Session'. %% This is a convenience function that awaits attached event diff --git a/deps/amqp10_client/src/amqp10_client.hrl b/deps/amqp10_client/src/amqp10_client.hrl index 030e310d19..0f2b6917cb 100644 --- a/deps/amqp10_client/src/amqp10_client.hrl +++ b/deps/amqp10_client/src/amqp10_client.hrl @@ -11,6 +11,7 @@ -define(MAX_MAX_FRAME_SIZE, 1024 * 1024). -define(FRAME_HEADER_SIZE, 8). +-define(TIMEOUT, 5000). % -define(debug, true). -ifdef(debug). diff --git a/deps/amqp10_client/src/amqp10_client_connection.erl b/deps/amqp10_client/src/amqp10_client_connection.erl index 71195fecb1..5858e36c5a 100644 --- a/deps/amqp10_client/src/amqp10_client_connection.erl +++ b/deps/amqp10_client/src/amqp10_client_connection.erl @@ -7,7 +7,7 @@ -module(amqp10_client_connection). --behaviour(gen_fsm). +-behaviour(gen_statem). -include("amqp10_client.hrl"). -include_lib("amqp10_common/include/amqp10_framing.hrl"). @@ -34,21 +34,19 @@ %% gen_fsm callbacks. -export([init/1, - handle_event/3, - handle_sync_event/4, - handle_info/3, + callback_mode/0, terminate/3, code_change/4]). %% gen_fsm state callbacks. --export([expecting_socket/2, - sasl_hdr_sent/2, - sasl_hdr_rcvds/2, - sasl_init_sent/2, - hdr_sent/2, - open_sent/2, - opened/2, - close_sent/2]). +-export([expecting_socket/3, + sasl_hdr_sent/3, + sasl_hdr_rcvds/3, + sasl_init_sent/3, + hdr_sent/3, + open_sent/3, + opened/3, + close_sent/3]). -type amqp10_socket() :: {tcp, gen_tcp:socket()} | {ssl, ssl:sslsocket()}. @@ -72,7 +70,10 @@ % set to a negative value to allow a sender to "overshoot" the flow % control by this margin transfer_limit_margin => 0 | neg_integer(), - sasl => none | anon | {plain, User :: binary(), Pwd :: binary()} + sasl => none | anon | {plain, User :: binary(), Pwd :: binary()}, + notify => pid(), + notify_when_opened => pid() | none, + notify_when_closed => pid() | none }. -record(state, @@ -122,45 +123,48 @@ open(Config) -> -spec close(pid(), {amqp10_client_types:amqp_error() | amqp10_client_types:connection_error(), binary()} | none) -> ok. close(Pid, Reason) -> - gen_fsm:send_event(Pid, {close, Reason}). + gen_statem:cast(Pid, {close, Reason}). %% ------------------------------------------------------------------- %% Private API. %% ------------------------------------------------------------------- start_link(Sup, Config) -> - gen_fsm:start_link(?MODULE, [Sup, Config], []). + gen_statem:start_link(?MODULE, [Sup, Config], []). set_other_procs(Pid, OtherProcs) -> - gen_fsm:send_all_state_event(Pid, {set_other_procs, OtherProcs}). + gen_statem:cast(Pid, {set_other_procs, OtherProcs}). -spec socket_ready(pid(), amqp10_socket()) -> ok. socket_ready(Pid, Socket) -> - gen_fsm:send_event(Pid, {socket_ready, Socket}). + gen_statem:cast(Pid, {socket_ready, Socket}). -spec protocol_header_received(pid(), 0 | 3, non_neg_integer(), non_neg_integer(), non_neg_integer()) -> ok. protocol_header_received(Pid, Protocol, Maj, Min, Rev) -> - gen_fsm:send_event(Pid, {protocol_header_received, Protocol, Maj, Min, Rev}). + gen_statem:cast(Pid, {protocol_header_received, Protocol, Maj, Min, Rev}). -spec begin_session(pid()) -> supervisor:startchild_ret(). begin_session(Pid) -> - gen_fsm:sync_send_all_state_event(Pid, begin_session). + gen_statem:call(Pid, begin_session, {dirty_timeout, ?TIMEOUT}). heartbeat(Pid) -> - gen_fsm:send_event(Pid, heartbeat). + gen_statem:cast(Pid, heartbeat). %% ------------------------------------------------------------------- %% gen_fsm callbacks. %% ------------------------------------------------------------------- +callback_mode() -> [state_functions]. + init([Sup, Config0]) -> process_flag(trap_exit, true), Config = maps:merge(config_defaults(), Config0), {ok, expecting_socket, #state{connection_sup = Sup, config = Config}}. -expecting_socket({socket_ready, Socket}, State = #state{config = Cfg}) -> +expecting_socket(_EvtType, {socket_ready, Socket}, + State = #state{config = Cfg}) -> State1 = State#state{socket = Socket}, case Cfg of #{sasl := none} -> @@ -169,12 +173,25 @@ expecting_socket({socket_ready, Socket}, State = #state{config = Cfg}) -> _ -> ok = socket_send(Socket, ?SASL_PROTOCOL_HEADER), {next_state, sasl_hdr_sent, State1} - end. + end; +expecting_socket(_EvtType, {set_other_procs, OtherProcs}, State) -> + {keep_state, set_other_procs0(OtherProcs, State)}; +expecting_socket({call, From}, begin_session, + #state{pending_session_reqs = PendingSessionReqs} = State) -> + %% The caller already asked for a new session but the connection + %% isn't fully opened. Let's queue this request until the connection + %% is ready. + State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]}, + {keep_state, State1}. -sasl_hdr_sent({protocol_header_received, 3, 1, 0, 0}, State) -> - {next_state, sasl_hdr_rcvds, State}. +sasl_hdr_sent(_EvtType, {protocol_header_received, 3, 1, 0, 0}, State) -> + {next_state, sasl_hdr_rcvds, State}; +sasl_hdr_sent({call, From}, begin_session, + #state{pending_session_reqs = PendingSessionReqs} = State) -> + State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]}, + {keep_state, State1}. -sasl_hdr_rcvds(#'v1_0.sasl_mechanisms'{ +sasl_hdr_rcvds(_EvtType, #'v1_0.sasl_mechanisms'{ sasl_server_mechanisms = {array, symbol, Mechs}}, State = #state{config = #{sasl := Sasl}}) -> SaslBin = {symbol, sasl_to_bin(Sasl)}, @@ -186,28 +203,41 @@ sasl_hdr_rcvds(#'v1_0.sasl_mechanisms'{ {next_state, sasl_init_sent, State}; false -> {stop, {sasl_not_supported, Sasl}, State} - end. + end; +sasl_hdr_rcvds({call, From}, begin_session, + #state{pending_session_reqs = PendingSessionReqs} = State) -> + State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]}, + {keep_state, State1}. -sasl_init_sent(#'v1_0.sasl_outcome'{code = {ubyte, 0}}, +sasl_init_sent(_EvtType, #'v1_0.sasl_outcome'{code = {ubyte, 0}}, #state{socket = Socket} = State) -> ok = socket_send(Socket, ?AMQP_PROTOCOL_HEADER), {next_state, hdr_sent, State}; -sasl_init_sent(#'v1_0.sasl_outcome'{code = {ubyte, C}}, +sasl_init_sent(_EvtType, #'v1_0.sasl_outcome'{code = {ubyte, C}}, #state{} = State) when C==1;C==2;C==3;C==4 -> - {stop, sasl_auth_failure, State}. + {stop, sasl_auth_failure, State}; +sasl_init_sent({call, From}, begin_session, + #state{pending_session_reqs = PendingSessionReqs} = State) -> + State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]}, + {keep_state, State1}. -hdr_sent({protocol_header_received, 0, 1, 0, 0}, State) -> +hdr_sent(_EvtType, {protocol_header_received, 0, 1, 0, 0}, State) -> case send_open(State) of ok -> {next_state, open_sent, State}; Error -> {stop, Error, State} end; -hdr_sent({protocol_header_received, Protocol, Maj, Min, +hdr_sent(_EvtType, {protocol_header_received, Protocol, Maj, Min, Rev}, State) -> error_logger:warning_msg("Unsupported protocol version: ~b ~b.~b.~b~n", [Protocol, Maj, Min, Rev]), - {stop, normal, State}. + {stop, normal, State}; +hdr_sent({call, From}, begin_session, + #state{pending_session_reqs = PendingSessionReqs} = State) -> + State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]}, + {keep_state, State1}. -open_sent(#'v1_0.open'{max_frame_size = MFSz, idle_time_out = Timeout}, +open_sent(_EvtType, #'v1_0.open'{max_frame_size = MFSz, + idle_time_out = Timeout}, #state{pending_session_reqs = PendingSessionReqs, config = Config} = State0) -> State = case Timeout of @@ -223,17 +253,21 @@ open_sent(#'v1_0.open'{max_frame_size = MFSz, idle_time_out = Timeout}, State2 = lists:foldr( fun(From, S0) -> {Ret, S2} = handle_begin_session(From, S0), - _ = gen_fsm:reply(From, Ret), + _ = gen_statem:reply(From, Ret), S2 end, State1, PendingSessionReqs), ok = notify_opened(Config), - {next_state, opened, State2}. + {next_state, opened, State2}; +open_sent({call, From}, begin_session, + #state{pending_session_reqs = PendingSessionReqs} = State) -> + State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]}, + {keep_state, State1}. -opened(heartbeat, State = #state{idle_time_out = T}) -> +opened(_EvtType, heartbeat, State = #state{idle_time_out = T}) -> ok = send_heartbeat(State), {ok, Tmr} = start_heartbeat_timer(T), - {next_state, opened, State#state{heartbeat_timer = Tmr}}; -opened({close, Reason}, State = #state{config = Config}) -> + {keep_state, State#state{heartbeat_timer = Tmr}}; +opened(_EvtType, {close, Reason}, State = #state{config = Config}) -> %% We send the first close frame and wait for the reply. %% TODO: stop all sessions writing %% We could still accept incoming frames (See: 2.4.6) @@ -243,61 +277,39 @@ opened({close, Reason}, State = #state{config = Config}) -> {error, closed} -> {stop, normal, State}; Error -> {stop, Error, State} end; -opened(#'v1_0.close'{error = Error}, State = #state{config = Config}) -> +opened(_EvtType, #'v1_0.close'{error = Error}, State = #state{config = Config}) -> %% We receive the first close frame, reply and terminate. ok = notify_closed(Config, translate_err(Error)), _ = send_close(State, none), {stop, normal, State}; -opened(Frame, State) -> +opened({call, From}, begin_session, State) -> + {Ret, State1} = handle_begin_session(From, State), + {keep_state, State1, [{reply, From, Ret}]}; +opened(info, {'DOWN', MRef, _, _, _Info}, + State = #state{reader_m_ref = MRef, config = Config}) -> + %% reader has gone down and we are not already shutting down + ok = notify_closed(Config, shutdown), + {stop, normal, State}; +opened(_EvtType, Frame, State) -> error_logger:warning_msg("Unexpected connection frame ~p when in state ~p ~n", [Frame, State]), - {next_state, opened, State}. + {keep_state, State}. -close_sent(heartbeat, State) -> +close_sent(_EvtType, heartbeat, State) -> {next_state, close_sent, State}; -close_sent(#'v1_0.close'{}, State) -> +close_sent(_EvtType, #'v1_0.close'{}, State) -> %% TODO: we should probably set up a timer before this to ensure %% we close down event if no reply is received {stop, normal, State}. -handle_event({set_other_procs, OtherProcs}, StateName, State) -> +set_other_procs0(OtherProcs, State) -> #{sessions_sup := SessionsSup, reader := Reader} = OtherProcs, ReaderMRef = monitor(process, Reader), amqp10_client_frame_reader:set_connection(Reader, self()), - State1 = State#state{sessions_sup = SessionsSup, - reader_m_ref = ReaderMRef, - reader = Reader}, - {next_state, StateName, State1}; -handle_event(_Event, StateName, State) -> - {next_state, StateName, State}. - -handle_sync_event(begin_session, From, opened, State) -> - {Ret, State1} = handle_begin_session(From, State), - {reply, Ret, opened, State1}; -handle_sync_event(begin_session, From, StateName, - #state{pending_session_reqs = PendingSessionReqs} = State) - when StateName =/= close_sent -> - %% The caller already asked for a new session but the connection - %% isn't fully opened. Let's queue this request until the connection - %% is ready. - State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]}, - {next_state, StateName, State1}; -handle_sync_event(begin_session, _From, StateName, State) -> - {reply, {error, connection_closed}, StateName, State}; -handle_sync_event(_Event, _From, StateName, State) -> - Reply = ok, - {reply, Reply, StateName, State}. - -handle_info({'DOWN', MRef, _, _, _Info}, StateName, State = #state{reader_m_ref = MRef, - config = Config}) - when StateName =/= close_sent -> - %% reader has gone down and we are not already shutting down - ok = notify_closed(Config, shutdown), - {stop, normal, State}; -handle_info(Info, StateName, State) -> - error_logger:info_msg("Conn handle_info ~p ~p~n", [Info, StateName]), - {next_state, StateName, State}. + State#state{sessions_sup = SessionsSup, + reader_m_ref = ReaderMRef, + reader = Reader}. terminate(Reason, _StateName, #state{connection_sup = Sup, config = Config}) -> diff --git a/deps/amqp10_client/src/amqp10_client_frame_reader.erl b/deps/amqp10_client/src/amqp10_client_frame_reader.erl index 5cc14a2047..524ead07ee 100644 --- a/deps/amqp10_client/src/amqp10_client_frame_reader.erl +++ b/deps/amqp10_client/src/amqp10_client_frame_reader.erl @@ -196,13 +196,11 @@ maybe_close_socket(undefined) -> maybe_close_socket(Socket) -> close_socket(Socket). --dialyzer({no_fail_call, close_socket/1}). close_socket({tcp, Socket}) -> gen_tcp:close(Socket); close_socket({ssl, Socket}) -> ssl:close(Socket). --dialyzer({no_fail_call, set_active_once/1}). set_active_once(#state{socket = {tcp, Socket}}) -> ok = inet:setopts(Socket, [{active, once}]); set_active_once(#state{socket = {ssl, Socket}}) -> @@ -283,8 +281,8 @@ route_frame(Channel, FrameType, {Performative, Payload} = Frame, State0) -> State0), ?DBG("FRAME -> ~p ~p~n ~p~n", [Channel, DestinationPid, Performative]), case Payload of - <<>> -> ok = gen_fsm:send_event(DestinationPid, Performative); - _ -> ok = gen_fsm:send_event(DestinationPid, Frame) + <<>> -> ok = gen_statem:cast(DestinationPid, Performative); + _ -> ok = gen_statem:cast(DestinationPid, Frame) end, State. diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl index bc5f42fdee..a6e60da65d 100644 --- a/deps/amqp10_client/src/amqp10_client_session.erl +++ b/deps/amqp10_client/src/amqp10_client_session.erl @@ -6,22 +6,11 @@ %% -module(amqp10_client_session). --behaviour(gen_fsm). +-behaviour(gen_statem). -include("amqp10_client.hrl"). -include_lib("amqp10_common/include/amqp10_framing.hrl"). --ifdef(nowarn_deprecated_gen_fsm). --compile({nowarn_deprecated_function, - [{gen_fsm, reply, 2}, - {gen_fsm, send_all_state_event, 2}, - {gen_fsm, send_event, 2}, - {gen_fsm, start_link, 3}, - {gen_fsm, sync_send_all_state_event, 2}, - {gen_fsm, sync_send_event, 2}, - {gen_fsm, sync_send_event, 3}]}). --endif. - %% Public API. -export(['begin'/1, begin_sync/1, @@ -39,21 +28,21 @@ socket_ready/2 ]). -%% gen_fsm callbacks. --export([init/1, - unmapped/2, +%% gen_statem callbacks +-export([ + init/1, + terminate/3, + code_change/4, + callback_mode/0 + ]). + +%% gen_statem state callbacks. +-export([ unmapped/3, - begin_sent/2, begin_sent/3, - mapped/2, mapped/3, - end_sent/2, - end_sent/3, - handle_event/3, - handle_sync_event/4, - handle_info/3, - terminate/3, - code_change/4]). + end_sent/3 + ]). -define(MAX_SESSION_WINDOW_SIZE, 65535). -define(DEFAULT_MAX_HANDLE, 16#ffffffff). @@ -180,31 +169,31 @@ begin_sync(Connection, Timeout) -> -spec 'end'(pid()) -> ok. 'end'(Pid) -> - gen_fsm:send_event(Pid, 'end'). + gen_statem:cast(Pid, 'end'). -spec attach(pid(), attach_args()) -> {ok, link_ref()}. attach(Session, Args) -> - {ok, LinkRef} = gen_fsm:sync_send_event(Session, {attach, Args}), - {ok, LinkRef}. + gen_statem:call(Session, {attach, Args}, {dirty_timeout, ?TIMEOUT}). -spec detach(pid(), link_handle()) -> ok | {error, link_not_found | half_attached}. detach(Session, Handle) -> - gen_fsm:sync_send_event(Session, {detach, Handle}). + gen_statem:call(Session, {detach, Handle}, {dirty_timeout, ?TIMEOUT}). -spec transfer(pid(), amqp10_msg:amqp10_msg(), timeout()) -> ok | {error, insufficient_credit | link_not_found | half_attached}. transfer(Session, Amqp10Msg, Timeout) -> [Transfer | Records] = amqp10_msg:to_amqp_records(Amqp10Msg), - gen_fsm:sync_send_event(Session, {transfer, Transfer, Records}, Timeout). + gen_statem:call(Session, {transfer, Transfer, Records}, + {dirty_timeout, Timeout}). flow(Session, Handle, Flow, RenewAfter) -> - gen_fsm:send_event(Session, {flow, Handle, Flow, RenewAfter}). + gen_statem:cast(Session, {flow, Handle, Flow, RenewAfter}). -spec disposition(pid(), link_role(), transfer_id(), transfer_id(), boolean(), amqp10_client_types:delivery_state()) -> ok. disposition(Session, Role, First, Last, Settled, DeliveryState) -> - gen_fsm:sync_send_event(Session, {disposition, Role, First, Last, Settled, - DeliveryState}). + gen_statem:call(Session, {disposition, Role, First, Last, Settled, + DeliveryState}, {dirty_timeout, ?TIMEOUT}). @@ -213,16 +202,18 @@ disposition(Session, Role, First, Last, Settled, DeliveryState) -> %% ------------------------------------------------------------------- start_link(From, Channel, Reader, ConnConfig) -> - gen_fsm:start_link(?MODULE, [From, Channel, Reader, ConnConfig], []). + gen_statem:start_link(?MODULE, [From, Channel, Reader, ConnConfig], []). -spec socket_ready(pid(), amqp10_client_connection:amqp10_socket()) -> ok. socket_ready(Pid, Socket) -> - gen_fsm:send_event(Pid, {socket_ready, Socket}). + gen_statem:cast(Pid, {socket_ready, Socket}). %% ------------------------------------------------------------------- -%% gen_fsm callbacks. +%% gen_statem callbacks. %% ------------------------------------------------------------------- +callback_mode() -> [state_functions]. + init([FromPid, Channel, Reader, ConnConfig]) -> process_flag(trap_exit, true), amqp10_client_frame_reader:register_session(Reader, self(), Channel), @@ -230,26 +221,25 @@ init([FromPid, Channel, Reader, ConnConfig]) -> connection_config = ConnConfig}, {ok, unmapped, State}. -unmapped({socket_ready, Socket}, State) -> +unmapped(cast, {socket_ready, Socket}, State) -> State1 = State#state{socket = Socket}, ok = send_begin(State1), - {next_state, begin_sent, State1}. - -unmapped({attach, Attach}, From, + {next_state, begin_sent, State1}; +unmapped({call, From}, {attach, Attach}, #state{early_attach_requests = EARs} = State) -> - {next_state, unmapped, + {keep_state, State#state{early_attach_requests = [{From, Attach} | EARs]}}. -begin_sent(#'v1_0.begin'{remote_channel = {ushort, RemoteChannel}, - next_outgoing_id = {uint, NOI}, - incoming_window = {uint, InWindow}, - outgoing_window = {uint, OutWindow}}, +begin_sent(cast, #'v1_0.begin'{remote_channel = {ushort, RemoteChannel}, + next_outgoing_id = {uint, NOI}, + incoming_window = {uint, InWindow}, + outgoing_window = {uint, OutWindow}}, #state{early_attach_requests = EARs} = State) -> State1 = State#state{remote_channel = RemoteChannel}, State2 = lists:foldr(fun({From, Attach}, S) -> {S2, H} = send_attach(fun send/2, Attach, From, S), - gen_fsm:reply(From, {ok, H}), + gen_statem:reply(From, {ok, H}), S2 end, State1, EARs), @@ -258,21 +248,20 @@ begin_sent(#'v1_0.begin'{remote_channel = {ushort, RemoteChannel}, {next_state, mapped, State2#state{early_attach_requests = [], next_incoming_id = NOI, remote_incoming_window = InWindow, - remote_outgoing_window = OutWindow}}. - -begin_sent({attach, Attach}, From, - #state{early_attach_requests = EARs} = State) -> - {next_state, begin_sent, + remote_outgoing_window = OutWindow}}; +begin_sent({call, From}, {attach, Attach}, + #state{early_attach_requests = EARs} = State) -> + {keep_state, State#state{early_attach_requests = [{From, Attach} | EARs]}}. -mapped('end', State) -> +mapped(cast, 'end', State) -> %% We send the first end frame and wait for the reply. send_end(State), {next_state, end_sent, State}; -mapped({flow, OutHandle, Flow0, RenewAfter}, State0) -> +mapped(cast, {flow, OutHandle, Flow0, RenewAfter}, State0) -> State = send_flow(fun send/2, OutHandle, Flow0, RenewAfter, State0), {next_state, mapped, State}; -mapped(#'v1_0.end'{error = Err}, State) -> +mapped(cast, #'v1_0.end'{error = Err}, State) -> %% We receive the first end frame, reply and terminate. _ = send_end(State), % TODO: send notifications for links? @@ -282,11 +271,11 @@ mapped(#'v1_0.end'{error = Err}, State) -> end, ok = notify_session_ended(State, Reason), {stop, normal, State}; -mapped(#'v1_0.attach'{name = {utf8, Name}, - initial_delivery_count = IDC, - handle = {uint, InHandle}}, - #state{links = Links, link_index = LinkIndex, - link_handle_index = LHI} = State0) -> +mapped(cast, #'v1_0.attach'{name = {utf8, Name}, + initial_delivery_count = IDC, + handle = {uint, InHandle}}, + #state{links = Links, link_index = LinkIndex, + link_handle_index = LHI} = State0) -> #{Name := OutHandle} = LinkIndex, #{OutHandle := Link0} = Links, @@ -302,8 +291,9 @@ mapped(#'v1_0.attach'{name = {utf8, Name}, link_index = maps:remove(Name, LinkIndex), link_handle_index = LHI#{InHandle => OutHandle}}, {next_state, mapped, State}; - -mapped(#'v1_0.detach'{handle = {uint, InHandle}, closed = Closed, error = Err}, +mapped(cast, #'v1_0.detach'{handle = {uint, InHandle}, + closed = Closed, + error = Err}, #state{links = Links, link_handle_index = LHI} = State0) when Closed =:= true orelse Closed =:= undefined -> with_link(InHandle, State0, @@ -317,11 +307,10 @@ mapped(#'v1_0.detach'{handle = {uint, InHandle}, closed = Closed, error = Err}, State#state{links = maps:remove(OutHandle, Links), link_handle_index = maps:remove(InHandle, LHI)}} end); - -mapped(#'v1_0.flow'{handle = undefined} = Flow, State0) -> +mapped(cast, #'v1_0.flow'{handle = undefined} = Flow, State0) -> State = handle_session_flow(Flow, State0), {next_state, mapped, State}; -mapped(#'v1_0.flow'{handle = {uint, InHandle}} = Flow, +mapped(cast, #'v1_0.flow'{handle = {uint, InHandle}} = Flow, #state{links = Links} = State0) -> State = handle_session_flow(Flow, State0), @@ -335,8 +324,7 @@ mapped(#'v1_0.flow'{handle = {uint, InHandle}} = Flow, Links1 = Links#{OutHandle => Link}, State1 = State#state{links = Links1}, {next_state, mapped, State1}; - -mapped({#'v1_0.transfer'{handle = {uint, InHandle}, +mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle}, more = true} = Transfer, Payload}, #state{links = Links} = State0) -> @@ -348,7 +336,7 @@ mapped({#'v1_0.transfer'{handle = {uint, InHandle}, State = book_partial_transfer_received( State0#state{links = Links#{OutHandle => Link1}}), {next_state, mapped, State}; -mapped({#'v1_0.transfer'{handle = {uint, InHandle}, +mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle}, delivery_id = MaybeDeliveryId, settled = Settled} = Transfer0, Payload0}, #state{incoming_unsettled = Unsettled0} = State0) -> @@ -395,9 +383,10 @@ mapped({#'v1_0.transfer'{handle = {uint, InHandle}, % role=true indicates the disposition is from a `receiver`. i.e. from the % clients point of view these are dispositions relating to `sender`links -mapped(#'v1_0.disposition'{role = true, settled = true, first = {uint, First}, +mapped(cast, #'v1_0.disposition'{role = true, settled = true, first = {uint, First}, last = Last0, state = DeliveryState}, #state{unsettled = Unsettled0} = State) -> + ct:pal("DISPOSITION in SESS ~p", [Unsettled0]), Last = case Last0 of undefined -> First; {uint, L} -> L @@ -416,55 +405,54 @@ mapped(#'v1_0.disposition'{role = true, settled = true, first = {uint, First}, end, Unsettled0, lists:seq(First, Last)), {next_state, mapped, State#state{unsettled = Unsettled}}; -mapped(Frame, State) -> +mapped(cast, Frame, State) -> error_logger:warning_msg("Unhandled session frame ~p in state ~p~n", [Frame, State]), - {next_state, mapped, State}. - -%% mapped/3 -%% TODO:: settled = false is probably dependent on the snd_settle_mode -%% e.g. if snd_settle_mode = settled it may not be valid to send an -%% unsettled transfer. Who knows really? -%% -%% Transfer. See spec section: 2.6.12 -mapped({transfer, #'v1_0.transfer'{handle = {uint, OutHandle}, + {next_state, mapped, State}; +mapped({call, From}, + {transfer, #'v1_0.transfer'{handle = {uint, OutHandle}, delivery_tag = {binary, DeliveryTag}, settled = false} = Transfer0, Parts}, - From, #state{next_outgoing_id = NOI, links = Links, - unsettled = Unsettled} = State) -> + #state{next_outgoing_id = NOI, links = Links, + unsettled = Unsettled} = State) -> case Links of #{OutHandle := #link{input_handle = undefined}} -> - {reply, {error, half_attached}, mapped, State}; + {keep_state, State, [{reply, From, {error, half_attached}}]}; #{OutHandle := #link{link_credit = LC}} when LC =< 0 -> - {reply, {error, insufficient_credit}, mapped, State}; + {keep_state, State, [{reply, From, {error, insufficient_credit}}]}; #{OutHandle := Link} -> Transfer = Transfer0#'v1_0.transfer'{delivery_id = uint(NOI), resume = false}, {ok, NumFrames} = send_transfer(Transfer, Parts, State), State1 = State#state{unsettled = Unsettled#{NOI => {DeliveryTag, From}}}, - {reply, ok, mapped, book_transfer_send(NumFrames, Link, State1)}; + {keep_state, book_transfer_send(NumFrames, Link, State1), + [{reply, From, ok}]}; _ -> - {reply, {error, link_not_found}, mapped, State} + {keep_state, State, [{reply, From, {error, link_not_found}}]} + end; -mapped({transfer, #'v1_0.transfer'{handle = {uint, OutHandle}} = Transfer0, - Parts}, _From, #state{next_outgoing_id = NOI, - links = Links} = State) -> +mapped({call, From}, + {transfer, #'v1_0.transfer'{handle = {uint, OutHandle}} = Transfer0, + Parts}, #state{next_outgoing_id = NOI, + links = Links} = State) -> case Links of #{OutHandle := #link{input_handle = undefined}} -> - {reply, {error, half_attached}, mapped, State}; + {keep_state_and_data, [{reply, From, {error, half_attached}}]}; #{OutHandle := #link{link_credit = LC}} when LC =< 0 -> - {reply, {error, insufficient_credit}, mapped, State}; + {keep_state_and_data, [{reply, From, {error, insufficient_credit}}]}; #{OutHandle := Link} -> Transfer = Transfer0#'v1_0.transfer'{delivery_id = uint(NOI)}, {ok, NumFrames} = send_transfer(Transfer, Parts, State), % TODO look into if erlang will correctly wrap integers during % binary conversion. - {reply, ok, mapped, book_transfer_send(NumFrames, Link, State)}; + {keep_state, book_transfer_send(NumFrames, Link, State), + [{reply, From, ok}]}; _ -> - {reply, {error, link_not_found}, mapped, State} + {keep_state, [{reply, From, {error, link_not_found}}]} end; -mapped({disposition, Role, First, Last, Settled0, DeliveryState}, _From, +mapped({call, From}, + {disposition, Role, First, Last, Settled0, DeliveryState}, #state{incoming_unsettled = Unsettled0, links = Links0} = State0) -> Disposition = @@ -494,36 +482,27 @@ mapped({disposition, Role, First, Last, Settled0, DeliveryState}, _From, Res = send(Disposition, State), - {reply, Res, mapped, State}; + {keep_state, State, [{reply, From, Res}]}; -mapped({attach, Attach}, From, State) -> +mapped({call, From}, {attach, Attach}, State) -> {State1, LinkRef} = send_attach(fun send/2, Attach, From, State), - {reply, {ok, LinkRef}, mapped, State1}; + {keep_state, State1, [{reply, From, {ok, LinkRef}}]}; -mapped(Msg, From, State) -> +mapped({call, From}, Msg, State) -> {Reply, State1} = send_detach(fun send/2, Msg, From, State), - {reply, Reply, mapped, State1}. + {keep_state, State1, [{reply, From, Reply}]}; -end_sent(#'v1_0.end'{}, State) -> - {stop, normal, State}; -end_sent(_Frame, State) -> - % just drop frames here - {next_state, end_sent, State}. +mapped(_EvtType, Msg, _State) -> + error_logger:info_msg("amqp10_session: unhandled msg in mapped state ~W", + [Msg, 10]), + keep_state_and_data. -end_sent(_Frame, _From, State) -> +end_sent(_EvtType, #'v1_0.end'{}, State) -> + {stop, normal, State}; +end_sent(_EvtType, _Frame, State) -> % just drop frames here {next_state, end_sent, State}. -handle_event(_Event, StateName, State) -> - {next_state, StateName, State}. - -handle_sync_event(_Event, _From, StateName, State) -> - Reply = ok, - {reply, Reply, StateName, State}. - -handle_info(_Info, StateName, State) -> - {next_state, StateName, State}. - terminate(Reason, _StateName, #state{channel = Channel, remote_channel = RemoteChannel, reader = Reader}) -> @@ -890,13 +869,14 @@ notify_session_ended(#state{notify = Pid}, Reason) -> ok. notify_disposition({Pid, _}, SessionDeliveryTag) -> + ct:pal("notify_disposition to ~w", [ Pid]), Pid ! {amqp10_disposition, SessionDeliveryTag}, ok. book_transfer_send(Num, #link{output_handle = Handle} = Link, - #state{next_outgoing_id = NOI, - remote_incoming_window = RIW, - links = Links} = State) -> + #state{next_outgoing_id = NOI, + remote_incoming_window = RIW, + links = Links} = State) -> State#state{next_outgoing_id = NOI+Num, remote_incoming_window = RIW-Num, links = Links#{Handle => incr_link_counters(Link)}}. diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl index b14859cb15..d81d48c4e3 100644 --- a/deps/amqp10_client/test/system_SUITE.erl +++ b/deps/amqp10_client/test/system_SUITE.erl @@ -259,6 +259,7 @@ open_connection_plain_sasl_failure(Config) -> ok = amqp10_client:close_connection(Connection). basic_roundtrip(Config) -> + application:start(sasl), Hostname = ?config(rmq_hostname, Config), Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), OpenConf = #{address => Hostname, port => Port, sasl => anon}, @@ -473,6 +474,7 @@ transfer_unsettled(Config) -> DeliveryTag = <<"my-tag">>, Msg = amqp10_msg:new(DeliveryTag, Data, false), ok = amqp10_client:send_msg(Sender, Msg), + ct:pal("test pid ~w", [self()]), ok = await_disposition(DeliveryTag), {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"data-receiver">>, @@ -506,7 +508,8 @@ subscribe(Config) -> {amqp10_event, {link, Receiver, credit_exhausted}} -> ok after 5000 -> - exit(credit_exhausted_assert) + flush(), + exit(credit_exhausted_assert) end, ok = amqp10_client:end_session(Session), @@ -708,7 +711,9 @@ receive_one(Receiver) -> await_disposition(DeliveryTag) -> receive {amqp10_disposition, {accepted, DeliveryTag}} -> ok - after 3000 -> exit(dispostion_timeout) + after 3000 -> + flush(), + exit(dispostion_timeout) end. await_link(Who, What, Err) -> @@ -717,8 +722,19 @@ await_link(Who, What, Err) -> ok; {amqp10_event, {link, Who, {detached, Why}}} -> exit(Why) - after 5000 -> exit(Err) + after 5000 -> + flush(), + exit(Err) end. to_bin(X) when is_list(X) -> list_to_binary(X). + +flush() -> + receive + Any -> + ct:pal("flush ~p", [Any]), + flush() + after 0 -> + ok + end. |