summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-07-23 15:01:22 +0100
committerkjnilsson <knilsson@pivotal.io>2020-07-24 15:38:50 +0100
commit4284f205bddfe18f4dbfc050b07f97ec00387954 (patch)
tree9fced68eb1ea3865a428d0fada8d2740c0c2448c
parent37f6a35ddecbb88b7dc4538ff2a57d2cba0836a1 (diff)
downloadrabbitmq-server-git-4284f205bddfe18f4dbfc050b07f97ec00387954.tar.gz
Switch remaining processes to gen_statem
-rw-r--r--deps/amqp10_client/src/amqp10_client.erl7
-rw-r--r--deps/amqp10_client/src/amqp10_client.hrl1
-rw-r--r--deps/amqp10_client/src/amqp10_client_connection.erl166
-rw-r--r--deps/amqp10_client/src/amqp10_client_frame_reader.erl6
-rw-r--r--deps/amqp10_client/src/amqp10_client_session.erl210
-rw-r--r--deps/amqp10_client/test/system_SUITE.erl22
6 files changed, 207 insertions, 205 deletions
diff --git a/deps/amqp10_client/src/amqp10_client.erl b/deps/amqp10_client/src/amqp10_client.erl
index 50cea18806..4420cbd2b3 100644
--- a/deps/amqp10_client/src/amqp10_client.erl
+++ b/deps/amqp10_client/src/amqp10_client.erl
@@ -10,11 +10,6 @@
-include("amqp10_client.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
--ifdef(nowarn_deprecated_gen_fsm).
--compile({nowarn_deprecated_function,
- [{gen_fsm, send_event, 2}]}).
--endif.
-
-export([open_connection/1,
open_connection/2,
close_connection/1,
@@ -158,7 +153,7 @@ begin_session_sync(Connection, Timeout) when is_pid(Connection) ->
%% {amqp10_event, {session, SessionPid, {ended, Why}}}
-spec end_session(pid()) -> ok.
end_session(Pid) ->
- gen_fsm:send_event(Pid, 'end').
+ amqp10_client_session:'end'(Pid).
%% @doc Synchronously attach a link on 'Session'.
%% This is a convenience function that awaits attached event
diff --git a/deps/amqp10_client/src/amqp10_client.hrl b/deps/amqp10_client/src/amqp10_client.hrl
index 030e310d19..0f2b6917cb 100644
--- a/deps/amqp10_client/src/amqp10_client.hrl
+++ b/deps/amqp10_client/src/amqp10_client.hrl
@@ -11,6 +11,7 @@
-define(MAX_MAX_FRAME_SIZE, 1024 * 1024).
-define(FRAME_HEADER_SIZE, 8).
+-define(TIMEOUT, 5000).
% -define(debug, true).
-ifdef(debug).
diff --git a/deps/amqp10_client/src/amqp10_client_connection.erl b/deps/amqp10_client/src/amqp10_client_connection.erl
index 71195fecb1..5858e36c5a 100644
--- a/deps/amqp10_client/src/amqp10_client_connection.erl
+++ b/deps/amqp10_client/src/amqp10_client_connection.erl
@@ -7,7 +7,7 @@
-module(amqp10_client_connection).
--behaviour(gen_fsm).
+-behaviour(gen_statem).
-include("amqp10_client.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
@@ -34,21 +34,19 @@
%% gen_fsm callbacks.
-export([init/1,
- handle_event/3,
- handle_sync_event/4,
- handle_info/3,
+ callback_mode/0,
terminate/3,
code_change/4]).
%% gen_fsm state callbacks.
--export([expecting_socket/2,
- sasl_hdr_sent/2,
- sasl_hdr_rcvds/2,
- sasl_init_sent/2,
- hdr_sent/2,
- open_sent/2,
- opened/2,
- close_sent/2]).
+-export([expecting_socket/3,
+ sasl_hdr_sent/3,
+ sasl_hdr_rcvds/3,
+ sasl_init_sent/3,
+ hdr_sent/3,
+ open_sent/3,
+ opened/3,
+ close_sent/3]).
-type amqp10_socket() :: {tcp, gen_tcp:socket()} | {ssl, ssl:sslsocket()}.
@@ -72,7 +70,10 @@
% set to a negative value to allow a sender to "overshoot" the flow
% control by this margin
transfer_limit_margin => 0 | neg_integer(),
- sasl => none | anon | {plain, User :: binary(), Pwd :: binary()}
+ sasl => none | anon | {plain, User :: binary(), Pwd :: binary()},
+ notify => pid(),
+ notify_when_opened => pid() | none,
+ notify_when_closed => pid() | none
}.
-record(state,
@@ -122,45 +123,48 @@ open(Config) ->
-spec close(pid(), {amqp10_client_types:amqp_error()
| amqp10_client_types:connection_error(), binary()} | none) -> ok.
close(Pid, Reason) ->
- gen_fsm:send_event(Pid, {close, Reason}).
+ gen_statem:cast(Pid, {close, Reason}).
%% -------------------------------------------------------------------
%% Private API.
%% -------------------------------------------------------------------
start_link(Sup, Config) ->
- gen_fsm:start_link(?MODULE, [Sup, Config], []).
+ gen_statem:start_link(?MODULE, [Sup, Config], []).
set_other_procs(Pid, OtherProcs) ->
- gen_fsm:send_all_state_event(Pid, {set_other_procs, OtherProcs}).
+ gen_statem:cast(Pid, {set_other_procs, OtherProcs}).
-spec socket_ready(pid(), amqp10_socket()) -> ok.
socket_ready(Pid, Socket) ->
- gen_fsm:send_event(Pid, {socket_ready, Socket}).
+ gen_statem:cast(Pid, {socket_ready, Socket}).
-spec protocol_header_received(pid(), 0 | 3, non_neg_integer(),
non_neg_integer(), non_neg_integer()) -> ok.
protocol_header_received(Pid, Protocol, Maj, Min, Rev) ->
- gen_fsm:send_event(Pid, {protocol_header_received, Protocol, Maj, Min, Rev}).
+ gen_statem:cast(Pid, {protocol_header_received, Protocol, Maj, Min, Rev}).
-spec begin_session(pid()) -> supervisor:startchild_ret().
begin_session(Pid) ->
- gen_fsm:sync_send_all_state_event(Pid, begin_session).
+ gen_statem:call(Pid, begin_session, {dirty_timeout, ?TIMEOUT}).
heartbeat(Pid) ->
- gen_fsm:send_event(Pid, heartbeat).
+ gen_statem:cast(Pid, heartbeat).
%% -------------------------------------------------------------------
%% gen_fsm callbacks.
%% -------------------------------------------------------------------
+callback_mode() -> [state_functions].
+
init([Sup, Config0]) ->
process_flag(trap_exit, true),
Config = maps:merge(config_defaults(), Config0),
{ok, expecting_socket, #state{connection_sup = Sup,
config = Config}}.
-expecting_socket({socket_ready, Socket}, State = #state{config = Cfg}) ->
+expecting_socket(_EvtType, {socket_ready, Socket},
+ State = #state{config = Cfg}) ->
State1 = State#state{socket = Socket},
case Cfg of
#{sasl := none} ->
@@ -169,12 +173,25 @@ expecting_socket({socket_ready, Socket}, State = #state{config = Cfg}) ->
_ ->
ok = socket_send(Socket, ?SASL_PROTOCOL_HEADER),
{next_state, sasl_hdr_sent, State1}
- end.
+ end;
+expecting_socket(_EvtType, {set_other_procs, OtherProcs}, State) ->
+ {keep_state, set_other_procs0(OtherProcs, State)};
+expecting_socket({call, From}, begin_session,
+ #state{pending_session_reqs = PendingSessionReqs} = State) ->
+ %% The caller already asked for a new session but the connection
+ %% isn't fully opened. Let's queue this request until the connection
+ %% is ready.
+ State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]},
+ {keep_state, State1}.
-sasl_hdr_sent({protocol_header_received, 3, 1, 0, 0}, State) ->
- {next_state, sasl_hdr_rcvds, State}.
+sasl_hdr_sent(_EvtType, {protocol_header_received, 3, 1, 0, 0}, State) ->
+ {next_state, sasl_hdr_rcvds, State};
+sasl_hdr_sent({call, From}, begin_session,
+ #state{pending_session_reqs = PendingSessionReqs} = State) ->
+ State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]},
+ {keep_state, State1}.
-sasl_hdr_rcvds(#'v1_0.sasl_mechanisms'{
+sasl_hdr_rcvds(_EvtType, #'v1_0.sasl_mechanisms'{
sasl_server_mechanisms = {array, symbol, Mechs}},
State = #state{config = #{sasl := Sasl}}) ->
SaslBin = {symbol, sasl_to_bin(Sasl)},
@@ -186,28 +203,41 @@ sasl_hdr_rcvds(#'v1_0.sasl_mechanisms'{
{next_state, sasl_init_sent, State};
false ->
{stop, {sasl_not_supported, Sasl}, State}
- end.
+ end;
+sasl_hdr_rcvds({call, From}, begin_session,
+ #state{pending_session_reqs = PendingSessionReqs} = State) ->
+ State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]},
+ {keep_state, State1}.
-sasl_init_sent(#'v1_0.sasl_outcome'{code = {ubyte, 0}},
+sasl_init_sent(_EvtType, #'v1_0.sasl_outcome'{code = {ubyte, 0}},
#state{socket = Socket} = State) ->
ok = socket_send(Socket, ?AMQP_PROTOCOL_HEADER),
{next_state, hdr_sent, State};
-sasl_init_sent(#'v1_0.sasl_outcome'{code = {ubyte, C}},
+sasl_init_sent(_EvtType, #'v1_0.sasl_outcome'{code = {ubyte, C}},
#state{} = State) when C==1;C==2;C==3;C==4 ->
- {stop, sasl_auth_failure, State}.
+ {stop, sasl_auth_failure, State};
+sasl_init_sent({call, From}, begin_session,
+ #state{pending_session_reqs = PendingSessionReqs} = State) ->
+ State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]},
+ {keep_state, State1}.
-hdr_sent({protocol_header_received, 0, 1, 0, 0}, State) ->
+hdr_sent(_EvtType, {protocol_header_received, 0, 1, 0, 0}, State) ->
case send_open(State) of
ok -> {next_state, open_sent, State};
Error -> {stop, Error, State}
end;
-hdr_sent({protocol_header_received, Protocol, Maj, Min,
+hdr_sent(_EvtType, {protocol_header_received, Protocol, Maj, Min,
Rev}, State) ->
error_logger:warning_msg("Unsupported protocol version: ~b ~b.~b.~b~n",
[Protocol, Maj, Min, Rev]),
- {stop, normal, State}.
+ {stop, normal, State};
+hdr_sent({call, From}, begin_session,
+ #state{pending_session_reqs = PendingSessionReqs} = State) ->
+ State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]},
+ {keep_state, State1}.
-open_sent(#'v1_0.open'{max_frame_size = MFSz, idle_time_out = Timeout},
+open_sent(_EvtType, #'v1_0.open'{max_frame_size = MFSz,
+ idle_time_out = Timeout},
#state{pending_session_reqs = PendingSessionReqs,
config = Config} = State0) ->
State = case Timeout of
@@ -223,17 +253,21 @@ open_sent(#'v1_0.open'{max_frame_size = MFSz, idle_time_out = Timeout},
State2 = lists:foldr(
fun(From, S0) ->
{Ret, S2} = handle_begin_session(From, S0),
- _ = gen_fsm:reply(From, Ret),
+ _ = gen_statem:reply(From, Ret),
S2
end, State1, PendingSessionReqs),
ok = notify_opened(Config),
- {next_state, opened, State2}.
+ {next_state, opened, State2};
+open_sent({call, From}, begin_session,
+ #state{pending_session_reqs = PendingSessionReqs} = State) ->
+ State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]},
+ {keep_state, State1}.
-opened(heartbeat, State = #state{idle_time_out = T}) ->
+opened(_EvtType, heartbeat, State = #state{idle_time_out = T}) ->
ok = send_heartbeat(State),
{ok, Tmr} = start_heartbeat_timer(T),
- {next_state, opened, State#state{heartbeat_timer = Tmr}};
-opened({close, Reason}, State = #state{config = Config}) ->
+ {keep_state, State#state{heartbeat_timer = Tmr}};
+opened(_EvtType, {close, Reason}, State = #state{config = Config}) ->
%% We send the first close frame and wait for the reply.
%% TODO: stop all sessions writing
%% We could still accept incoming frames (See: 2.4.6)
@@ -243,61 +277,39 @@ opened({close, Reason}, State = #state{config = Config}) ->
{error, closed} -> {stop, normal, State};
Error -> {stop, Error, State}
end;
-opened(#'v1_0.close'{error = Error}, State = #state{config = Config}) ->
+opened(_EvtType, #'v1_0.close'{error = Error}, State = #state{config = Config}) ->
%% We receive the first close frame, reply and terminate.
ok = notify_closed(Config, translate_err(Error)),
_ = send_close(State, none),
{stop, normal, State};
-opened(Frame, State) ->
+opened({call, From}, begin_session, State) ->
+ {Ret, State1} = handle_begin_session(From, State),
+ {keep_state, State1, [{reply, From, Ret}]};
+opened(info, {'DOWN', MRef, _, _, _Info},
+ State = #state{reader_m_ref = MRef, config = Config}) ->
+ %% reader has gone down and we are not already shutting down
+ ok = notify_closed(Config, shutdown),
+ {stop, normal, State};
+opened(_EvtType, Frame, State) ->
error_logger:warning_msg("Unexpected connection frame ~p when in state ~p ~n",
[Frame, State]),
- {next_state, opened, State}.
+ {keep_state, State}.
-close_sent(heartbeat, State) ->
+close_sent(_EvtType, heartbeat, State) ->
{next_state, close_sent, State};
-close_sent(#'v1_0.close'{}, State) ->
+close_sent(_EvtType, #'v1_0.close'{}, State) ->
%% TODO: we should probably set up a timer before this to ensure
%% we close down event if no reply is received
{stop, normal, State}.
-handle_event({set_other_procs, OtherProcs}, StateName, State) ->
+set_other_procs0(OtherProcs, State) ->
#{sessions_sup := SessionsSup,
reader := Reader} = OtherProcs,
ReaderMRef = monitor(process, Reader),
amqp10_client_frame_reader:set_connection(Reader, self()),
- State1 = State#state{sessions_sup = SessionsSup,
- reader_m_ref = ReaderMRef,
- reader = Reader},
- {next_state, StateName, State1};
-handle_event(_Event, StateName, State) ->
- {next_state, StateName, State}.
-
-handle_sync_event(begin_session, From, opened, State) ->
- {Ret, State1} = handle_begin_session(From, State),
- {reply, Ret, opened, State1};
-handle_sync_event(begin_session, From, StateName,
- #state{pending_session_reqs = PendingSessionReqs} = State)
- when StateName =/= close_sent ->
- %% The caller already asked for a new session but the connection
- %% isn't fully opened. Let's queue this request until the connection
- %% is ready.
- State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]},
- {next_state, StateName, State1};
-handle_sync_event(begin_session, _From, StateName, State) ->
- {reply, {error, connection_closed}, StateName, State};
-handle_sync_event(_Event, _From, StateName, State) ->
- Reply = ok,
- {reply, Reply, StateName, State}.
-
-handle_info({'DOWN', MRef, _, _, _Info}, StateName, State = #state{reader_m_ref = MRef,
- config = Config})
- when StateName =/= close_sent ->
- %% reader has gone down and we are not already shutting down
- ok = notify_closed(Config, shutdown),
- {stop, normal, State};
-handle_info(Info, StateName, State) ->
- error_logger:info_msg("Conn handle_info ~p ~p~n", [Info, StateName]),
- {next_state, StateName, State}.
+ State#state{sessions_sup = SessionsSup,
+ reader_m_ref = ReaderMRef,
+ reader = Reader}.
terminate(Reason, _StateName, #state{connection_sup = Sup,
config = Config}) ->
diff --git a/deps/amqp10_client/src/amqp10_client_frame_reader.erl b/deps/amqp10_client/src/amqp10_client_frame_reader.erl
index 5cc14a2047..524ead07ee 100644
--- a/deps/amqp10_client/src/amqp10_client_frame_reader.erl
+++ b/deps/amqp10_client/src/amqp10_client_frame_reader.erl
@@ -196,13 +196,11 @@ maybe_close_socket(undefined) ->
maybe_close_socket(Socket) ->
close_socket(Socket).
--dialyzer({no_fail_call, close_socket/1}).
close_socket({tcp, Socket}) ->
gen_tcp:close(Socket);
close_socket({ssl, Socket}) ->
ssl:close(Socket).
--dialyzer({no_fail_call, set_active_once/1}).
set_active_once(#state{socket = {tcp, Socket}}) ->
ok = inet:setopts(Socket, [{active, once}]);
set_active_once(#state{socket = {ssl, Socket}}) ->
@@ -283,8 +281,8 @@ route_frame(Channel, FrameType, {Performative, Payload} = Frame, State0) ->
State0),
?DBG("FRAME -> ~p ~p~n ~p~n", [Channel, DestinationPid, Performative]),
case Payload of
- <<>> -> ok = gen_fsm:send_event(DestinationPid, Performative);
- _ -> ok = gen_fsm:send_event(DestinationPid, Frame)
+ <<>> -> ok = gen_statem:cast(DestinationPid, Performative);
+ _ -> ok = gen_statem:cast(DestinationPid, Frame)
end,
State.
diff --git a/deps/amqp10_client/src/amqp10_client_session.erl b/deps/amqp10_client/src/amqp10_client_session.erl
index bc5f42fdee..a6e60da65d 100644
--- a/deps/amqp10_client/src/amqp10_client_session.erl
+++ b/deps/amqp10_client/src/amqp10_client_session.erl
@@ -6,22 +6,11 @@
%%
-module(amqp10_client_session).
--behaviour(gen_fsm).
+-behaviour(gen_statem).
-include("amqp10_client.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
--ifdef(nowarn_deprecated_gen_fsm).
--compile({nowarn_deprecated_function,
- [{gen_fsm, reply, 2},
- {gen_fsm, send_all_state_event, 2},
- {gen_fsm, send_event, 2},
- {gen_fsm, start_link, 3},
- {gen_fsm, sync_send_all_state_event, 2},
- {gen_fsm, sync_send_event, 2},
- {gen_fsm, sync_send_event, 3}]}).
--endif.
-
%% Public API.
-export(['begin'/1,
begin_sync/1,
@@ -39,21 +28,21 @@
socket_ready/2
]).
-%% gen_fsm callbacks.
--export([init/1,
- unmapped/2,
+%% gen_statem callbacks
+-export([
+ init/1,
+ terminate/3,
+ code_change/4,
+ callback_mode/0
+ ]).
+
+%% gen_statem state callbacks.
+-export([
unmapped/3,
- begin_sent/2,
begin_sent/3,
- mapped/2,
mapped/3,
- end_sent/2,
- end_sent/3,
- handle_event/3,
- handle_sync_event/4,
- handle_info/3,
- terminate/3,
- code_change/4]).
+ end_sent/3
+ ]).
-define(MAX_SESSION_WINDOW_SIZE, 65535).
-define(DEFAULT_MAX_HANDLE, 16#ffffffff).
@@ -180,31 +169,31 @@ begin_sync(Connection, Timeout) ->
-spec 'end'(pid()) -> ok.
'end'(Pid) ->
- gen_fsm:send_event(Pid, 'end').
+ gen_statem:cast(Pid, 'end').
-spec attach(pid(), attach_args()) -> {ok, link_ref()}.
attach(Session, Args) ->
- {ok, LinkRef} = gen_fsm:sync_send_event(Session, {attach, Args}),
- {ok, LinkRef}.
+ gen_statem:call(Session, {attach, Args}, {dirty_timeout, ?TIMEOUT}).
-spec detach(pid(), link_handle()) -> ok | {error, link_not_found | half_attached}.
detach(Session, Handle) ->
- gen_fsm:sync_send_event(Session, {detach, Handle}).
+ gen_statem:call(Session, {detach, Handle}, {dirty_timeout, ?TIMEOUT}).
-spec transfer(pid(), amqp10_msg:amqp10_msg(), timeout()) ->
ok | {error, insufficient_credit | link_not_found | half_attached}.
transfer(Session, Amqp10Msg, Timeout) ->
[Transfer | Records] = amqp10_msg:to_amqp_records(Amqp10Msg),
- gen_fsm:sync_send_event(Session, {transfer, Transfer, Records}, Timeout).
+ gen_statem:call(Session, {transfer, Transfer, Records},
+ {dirty_timeout, Timeout}).
flow(Session, Handle, Flow, RenewAfter) ->
- gen_fsm:send_event(Session, {flow, Handle, Flow, RenewAfter}).
+ gen_statem:cast(Session, {flow, Handle, Flow, RenewAfter}).
-spec disposition(pid(), link_role(), transfer_id(), transfer_id(), boolean(),
amqp10_client_types:delivery_state()) -> ok.
disposition(Session, Role, First, Last, Settled, DeliveryState) ->
- gen_fsm:sync_send_event(Session, {disposition, Role, First, Last, Settled,
- DeliveryState}).
+ gen_statem:call(Session, {disposition, Role, First, Last, Settled,
+ DeliveryState}, {dirty_timeout, ?TIMEOUT}).
@@ -213,16 +202,18 @@ disposition(Session, Role, First, Last, Settled, DeliveryState) ->
%% -------------------------------------------------------------------
start_link(From, Channel, Reader, ConnConfig) ->
- gen_fsm:start_link(?MODULE, [From, Channel, Reader, ConnConfig], []).
+ gen_statem:start_link(?MODULE, [From, Channel, Reader, ConnConfig], []).
-spec socket_ready(pid(), amqp10_client_connection:amqp10_socket()) -> ok.
socket_ready(Pid, Socket) ->
- gen_fsm:send_event(Pid, {socket_ready, Socket}).
+ gen_statem:cast(Pid, {socket_ready, Socket}).
%% -------------------------------------------------------------------
-%% gen_fsm callbacks.
+%% gen_statem callbacks.
%% -------------------------------------------------------------------
+callback_mode() -> [state_functions].
+
init([FromPid, Channel, Reader, ConnConfig]) ->
process_flag(trap_exit, true),
amqp10_client_frame_reader:register_session(Reader, self(), Channel),
@@ -230,26 +221,25 @@ init([FromPid, Channel, Reader, ConnConfig]) ->
connection_config = ConnConfig},
{ok, unmapped, State}.
-unmapped({socket_ready, Socket}, State) ->
+unmapped(cast, {socket_ready, Socket}, State) ->
State1 = State#state{socket = Socket},
ok = send_begin(State1),
- {next_state, begin_sent, State1}.
-
-unmapped({attach, Attach}, From,
+ {next_state, begin_sent, State1};
+unmapped({call, From}, {attach, Attach},
#state{early_attach_requests = EARs} = State) ->
- {next_state, unmapped,
+ {keep_state,
State#state{early_attach_requests = [{From, Attach} | EARs]}}.
-begin_sent(#'v1_0.begin'{remote_channel = {ushort, RemoteChannel},
- next_outgoing_id = {uint, NOI},
- incoming_window = {uint, InWindow},
- outgoing_window = {uint, OutWindow}},
+begin_sent(cast, #'v1_0.begin'{remote_channel = {ushort, RemoteChannel},
+ next_outgoing_id = {uint, NOI},
+ incoming_window = {uint, InWindow},
+ outgoing_window = {uint, OutWindow}},
#state{early_attach_requests = EARs} = State) ->
State1 = State#state{remote_channel = RemoteChannel},
State2 = lists:foldr(fun({From, Attach}, S) ->
{S2, H} = send_attach(fun send/2, Attach, From, S),
- gen_fsm:reply(From, {ok, H}),
+ gen_statem:reply(From, {ok, H}),
S2
end, State1, EARs),
@@ -258,21 +248,20 @@ begin_sent(#'v1_0.begin'{remote_channel = {ushort, RemoteChannel},
{next_state, mapped, State2#state{early_attach_requests = [],
next_incoming_id = NOI,
remote_incoming_window = InWindow,
- remote_outgoing_window = OutWindow}}.
-
-begin_sent({attach, Attach}, From,
- #state{early_attach_requests = EARs} = State) ->
- {next_state, begin_sent,
+ remote_outgoing_window = OutWindow}};
+begin_sent({call, From}, {attach, Attach},
+ #state{early_attach_requests = EARs} = State) ->
+ {keep_state,
State#state{early_attach_requests = [{From, Attach} | EARs]}}.
-mapped('end', State) ->
+mapped(cast, 'end', State) ->
%% We send the first end frame and wait for the reply.
send_end(State),
{next_state, end_sent, State};
-mapped({flow, OutHandle, Flow0, RenewAfter}, State0) ->
+mapped(cast, {flow, OutHandle, Flow0, RenewAfter}, State0) ->
State = send_flow(fun send/2, OutHandle, Flow0, RenewAfter, State0),
{next_state, mapped, State};
-mapped(#'v1_0.end'{error = Err}, State) ->
+mapped(cast, #'v1_0.end'{error = Err}, State) ->
%% We receive the first end frame, reply and terminate.
_ = send_end(State),
% TODO: send notifications for links?
@@ -282,11 +271,11 @@ mapped(#'v1_0.end'{error = Err}, State) ->
end,
ok = notify_session_ended(State, Reason),
{stop, normal, State};
-mapped(#'v1_0.attach'{name = {utf8, Name},
- initial_delivery_count = IDC,
- handle = {uint, InHandle}},
- #state{links = Links, link_index = LinkIndex,
- link_handle_index = LHI} = State0) ->
+mapped(cast, #'v1_0.attach'{name = {utf8, Name},
+ initial_delivery_count = IDC,
+ handle = {uint, InHandle}},
+ #state{links = Links, link_index = LinkIndex,
+ link_handle_index = LHI} = State0) ->
#{Name := OutHandle} = LinkIndex,
#{OutHandle := Link0} = Links,
@@ -302,8 +291,9 @@ mapped(#'v1_0.attach'{name = {utf8, Name},
link_index = maps:remove(Name, LinkIndex),
link_handle_index = LHI#{InHandle => OutHandle}},
{next_state, mapped, State};
-
-mapped(#'v1_0.detach'{handle = {uint, InHandle}, closed = Closed, error = Err},
+mapped(cast, #'v1_0.detach'{handle = {uint, InHandle},
+ closed = Closed,
+ error = Err},
#state{links = Links, link_handle_index = LHI} = State0)
when Closed =:= true orelse Closed =:= undefined ->
with_link(InHandle, State0,
@@ -317,11 +307,10 @@ mapped(#'v1_0.detach'{handle = {uint, InHandle}, closed = Closed, error = Err},
State#state{links = maps:remove(OutHandle, Links),
link_handle_index = maps:remove(InHandle, LHI)}}
end);
-
-mapped(#'v1_0.flow'{handle = undefined} = Flow, State0) ->
+mapped(cast, #'v1_0.flow'{handle = undefined} = Flow, State0) ->
State = handle_session_flow(Flow, State0),
{next_state, mapped, State};
-mapped(#'v1_0.flow'{handle = {uint, InHandle}} = Flow,
+mapped(cast, #'v1_0.flow'{handle = {uint, InHandle}} = Flow,
#state{links = Links} = State0) ->
State = handle_session_flow(Flow, State0),
@@ -335,8 +324,7 @@ mapped(#'v1_0.flow'{handle = {uint, InHandle}} = Flow,
Links1 = Links#{OutHandle => Link},
State1 = State#state{links = Links1},
{next_state, mapped, State1};
-
-mapped({#'v1_0.transfer'{handle = {uint, InHandle},
+mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle},
more = true} = Transfer, Payload},
#state{links = Links} = State0) ->
@@ -348,7 +336,7 @@ mapped({#'v1_0.transfer'{handle = {uint, InHandle},
State = book_partial_transfer_received(
State0#state{links = Links#{OutHandle => Link1}}),
{next_state, mapped, State};
-mapped({#'v1_0.transfer'{handle = {uint, InHandle},
+mapped(cast, {#'v1_0.transfer'{handle = {uint, InHandle},
delivery_id = MaybeDeliveryId,
settled = Settled} = Transfer0, Payload0},
#state{incoming_unsettled = Unsettled0} = State0) ->
@@ -395,9 +383,10 @@ mapped({#'v1_0.transfer'{handle = {uint, InHandle},
% role=true indicates the disposition is from a `receiver`. i.e. from the
% clients point of view these are dispositions relating to `sender`links
-mapped(#'v1_0.disposition'{role = true, settled = true, first = {uint, First},
+mapped(cast, #'v1_0.disposition'{role = true, settled = true, first = {uint, First},
last = Last0, state = DeliveryState},
#state{unsettled = Unsettled0} = State) ->
+ ct:pal("DISPOSITION in SESS ~p", [Unsettled0]),
Last = case Last0 of
undefined -> First;
{uint, L} -> L
@@ -416,55 +405,54 @@ mapped(#'v1_0.disposition'{role = true, settled = true, first = {uint, First},
end, Unsettled0, lists:seq(First, Last)),
{next_state, mapped, State#state{unsettled = Unsettled}};
-mapped(Frame, State) ->
+mapped(cast, Frame, State) ->
error_logger:warning_msg("Unhandled session frame ~p in state ~p~n",
[Frame, State]),
- {next_state, mapped, State}.
-
-%% mapped/3
-%% TODO:: settled = false is probably dependent on the snd_settle_mode
-%% e.g. if snd_settle_mode = settled it may not be valid to send an
-%% unsettled transfer. Who knows really?
-%%
-%% Transfer. See spec section: 2.6.12
-mapped({transfer, #'v1_0.transfer'{handle = {uint, OutHandle},
+ {next_state, mapped, State};
+mapped({call, From},
+ {transfer, #'v1_0.transfer'{handle = {uint, OutHandle},
delivery_tag = {binary, DeliveryTag},
settled = false} = Transfer0, Parts},
- From, #state{next_outgoing_id = NOI, links = Links,
- unsettled = Unsettled} = State) ->
+ #state{next_outgoing_id = NOI, links = Links,
+ unsettled = Unsettled} = State) ->
case Links of
#{OutHandle := #link{input_handle = undefined}} ->
- {reply, {error, half_attached}, mapped, State};
+ {keep_state, State, [{reply, From, {error, half_attached}}]};
#{OutHandle := #link{link_credit = LC}} when LC =< 0 ->
- {reply, {error, insufficient_credit}, mapped, State};
+ {keep_state, State, [{reply, From, {error, insufficient_credit}}]};
#{OutHandle := Link} ->
Transfer = Transfer0#'v1_0.transfer'{delivery_id = uint(NOI),
resume = false},
{ok, NumFrames} = send_transfer(Transfer, Parts, State),
State1 = State#state{unsettled = Unsettled#{NOI => {DeliveryTag, From}}},
- {reply, ok, mapped, book_transfer_send(NumFrames, Link, State1)};
+ {keep_state, book_transfer_send(NumFrames, Link, State1),
+ [{reply, From, ok}]};
_ ->
- {reply, {error, link_not_found}, mapped, State}
+ {keep_state, State, [{reply, From, {error, link_not_found}}]}
+
end;
-mapped({transfer, #'v1_0.transfer'{handle = {uint, OutHandle}} = Transfer0,
- Parts}, _From, #state{next_outgoing_id = NOI,
- links = Links} = State) ->
+mapped({call, From},
+ {transfer, #'v1_0.transfer'{handle = {uint, OutHandle}} = Transfer0,
+ Parts}, #state{next_outgoing_id = NOI,
+ links = Links} = State) ->
case Links of
#{OutHandle := #link{input_handle = undefined}} ->
- {reply, {error, half_attached}, mapped, State};
+ {keep_state_and_data, [{reply, From, {error, half_attached}}]};
#{OutHandle := #link{link_credit = LC}} when LC =< 0 ->
- {reply, {error, insufficient_credit}, mapped, State};
+ {keep_state_and_data, [{reply, From, {error, insufficient_credit}}]};
#{OutHandle := Link} ->
Transfer = Transfer0#'v1_0.transfer'{delivery_id = uint(NOI)},
{ok, NumFrames} = send_transfer(Transfer, Parts, State),
% TODO look into if erlang will correctly wrap integers during
% binary conversion.
- {reply, ok, mapped, book_transfer_send(NumFrames, Link, State)};
+ {keep_state, book_transfer_send(NumFrames, Link, State),
+ [{reply, From, ok}]};
_ ->
- {reply, {error, link_not_found}, mapped, State}
+ {keep_state, [{reply, From, {error, link_not_found}}]}
end;
-mapped({disposition, Role, First, Last, Settled0, DeliveryState}, _From,
+mapped({call, From},
+ {disposition, Role, First, Last, Settled0, DeliveryState},
#state{incoming_unsettled = Unsettled0,
links = Links0} = State0) ->
Disposition =
@@ -494,36 +482,27 @@ mapped({disposition, Role, First, Last, Settled0, DeliveryState}, _From,
Res = send(Disposition, State),
- {reply, Res, mapped, State};
+ {keep_state, State, [{reply, From, Res}]};
-mapped({attach, Attach}, From, State) ->
+mapped({call, From}, {attach, Attach}, State) ->
{State1, LinkRef} = send_attach(fun send/2, Attach, From, State),
- {reply, {ok, LinkRef}, mapped, State1};
+ {keep_state, State1, [{reply, From, {ok, LinkRef}}]};
-mapped(Msg, From, State) ->
+mapped({call, From}, Msg, State) ->
{Reply, State1} = send_detach(fun send/2, Msg, From, State),
- {reply, Reply, mapped, State1}.
+ {keep_state, State1, [{reply, From, Reply}]};
-end_sent(#'v1_0.end'{}, State) ->
- {stop, normal, State};
-end_sent(_Frame, State) ->
- % just drop frames here
- {next_state, end_sent, State}.
+mapped(_EvtType, Msg, _State) ->
+ error_logger:info_msg("amqp10_session: unhandled msg in mapped state ~W",
+ [Msg, 10]),
+ keep_state_and_data.
-end_sent(_Frame, _From, State) ->
+end_sent(_EvtType, #'v1_0.end'{}, State) ->
+ {stop, normal, State};
+end_sent(_EvtType, _Frame, State) ->
% just drop frames here
{next_state, end_sent, State}.
-handle_event(_Event, StateName, State) ->
- {next_state, StateName, State}.
-
-handle_sync_event(_Event, _From, StateName, State) ->
- Reply = ok,
- {reply, Reply, StateName, State}.
-
-handle_info(_Info, StateName, State) ->
- {next_state, StateName, State}.
-
terminate(Reason, _StateName, #state{channel = Channel,
remote_channel = RemoteChannel,
reader = Reader}) ->
@@ -890,13 +869,14 @@ notify_session_ended(#state{notify = Pid}, Reason) ->
ok.
notify_disposition({Pid, _}, SessionDeliveryTag) ->
+ ct:pal("notify_disposition to ~w", [ Pid]),
Pid ! {amqp10_disposition, SessionDeliveryTag},
ok.
book_transfer_send(Num, #link{output_handle = Handle} = Link,
- #state{next_outgoing_id = NOI,
- remote_incoming_window = RIW,
- links = Links} = State) ->
+ #state{next_outgoing_id = NOI,
+ remote_incoming_window = RIW,
+ links = Links} = State) ->
State#state{next_outgoing_id = NOI+Num,
remote_incoming_window = RIW-Num,
links = Links#{Handle => incr_link_counters(Link)}}.
diff --git a/deps/amqp10_client/test/system_SUITE.erl b/deps/amqp10_client/test/system_SUITE.erl
index b14859cb15..d81d48c4e3 100644
--- a/deps/amqp10_client/test/system_SUITE.erl
+++ b/deps/amqp10_client/test/system_SUITE.erl
@@ -259,6 +259,7 @@ open_connection_plain_sasl_failure(Config) ->
ok = amqp10_client:close_connection(Connection).
basic_roundtrip(Config) ->
+ application:start(sasl),
Hostname = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
OpenConf = #{address => Hostname, port => Port, sasl => anon},
@@ -473,6 +474,7 @@ transfer_unsettled(Config) ->
DeliveryTag = <<"my-tag">>,
Msg = amqp10_msg:new(DeliveryTag, Data, false),
ok = amqp10_client:send_msg(Sender, Msg),
+ ct:pal("test pid ~w", [self()]),
ok = await_disposition(DeliveryTag),
{ok, Receiver} = amqp10_client:attach_receiver_link(Session,
<<"data-receiver">>,
@@ -506,7 +508,8 @@ subscribe(Config) ->
{amqp10_event, {link, Receiver, credit_exhausted}} ->
ok
after 5000 ->
- exit(credit_exhausted_assert)
+ flush(),
+ exit(credit_exhausted_assert)
end,
ok = amqp10_client:end_session(Session),
@@ -708,7 +711,9 @@ receive_one(Receiver) ->
await_disposition(DeliveryTag) ->
receive
{amqp10_disposition, {accepted, DeliveryTag}} -> ok
- after 3000 -> exit(dispostion_timeout)
+ after 3000 ->
+ flush(),
+ exit(dispostion_timeout)
end.
await_link(Who, What, Err) ->
@@ -717,8 +722,19 @@ await_link(Who, What, Err) ->
ok;
{amqp10_event, {link, Who, {detached, Why}}} ->
exit(Why)
- after 5000 -> exit(Err)
+ after 5000 ->
+ flush(),
+ exit(Err)
end.
to_bin(X) when is_list(X) ->
list_to_binary(X).
+
+flush() ->
+ receive
+ Any ->
+ ct:pal("flush ~p", [Any]),
+ flush()
+ after 0 ->
+ ok
+ end.