summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_stomp/test/src/rabbit_stomp_client.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbitmq_stomp/test/src/rabbit_stomp_client.erl')
-rw-r--r--deps/rabbitmq_stomp/test/src/rabbit_stomp_client.erl75
1 files changed, 75 insertions, 0 deletions
diff --git a/deps/rabbitmq_stomp/test/src/rabbit_stomp_client.erl b/deps/rabbitmq_stomp/test/src/rabbit_stomp_client.erl
new file mode 100644
index 0000000000..739512e3b3
--- /dev/null
+++ b/deps/rabbitmq_stomp/test/src/rabbit_stomp_client.erl
@@ -0,0 +1,75 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% The stupidest client imaginable, just for testing.
+
+-module(rabbit_stomp_client).
+
+-export([connect/1, connect/2, connect/4, connect/5, disconnect/1, send/2, send/3, send/4, recv/1]).
+
+-include("rabbit_stomp_frame.hrl").
+
+-define(TIMEOUT, 1000). % milliseconds
+
+connect(Port) -> connect0([], "guest", "guest", Port, []).
+connect(V, Port) -> connect0([{"accept-version", V}], "guest", "guest", Port, []).
+connect(V, Login, Pass, Port) -> connect0([{"accept-version", V}], Login, Pass, Port, []).
+connect(V, Login, Pass, Port, Headers) -> connect0([{"accept-version", V}], Login, Pass, Port, Headers).
+
+connect0(Version, Login, Pass, Port, Headers) ->
+ %% The default port is 61613 but it's in the middle of the ephemeral
+ %% ports range on many operating systems. Therefore, there is a
+ %% chance this port is already in use. Let's use a port close to the
+ %% AMQP default port.
+ {ok, Sock} = gen_tcp:connect(localhost, Port, [{active, false}, binary]),
+ Client0 = recv_state(Sock),
+ send(Client0, "CONNECT", [{"login", Login},
+ {"passcode", Pass} | Version] ++ Headers),
+ {#stomp_frame{command = "CONNECTED"}, Client1} = recv(Client0),
+ {ok, Client1}.
+
+disconnect(Client = {Sock, _}) ->
+ send(Client, "DISCONNECT"),
+ gen_tcp:close(Sock).
+
+send(Client, Command) ->
+ send(Client, Command, []).
+
+send(Client, Command, Headers) ->
+ send(Client, Command, Headers, []).
+
+send({Sock, _}, Command, Headers, Body) ->
+ Frame = rabbit_stomp_frame:serialize(
+ #stomp_frame{command = list_to_binary(Command),
+ headers = Headers,
+ body_iolist = Body}),
+ gen_tcp:send(Sock, Frame).
+
+recv_state(Sock) ->
+ {Sock, []}.
+
+recv({_Sock, []} = Client) ->
+ recv(Client, rabbit_stomp_frame:initial_state(), 0);
+recv({Sock, [Frame | Frames]}) ->
+ {Frame, {Sock, Frames}}.
+
+recv(Client = {Sock, _}, FrameState, Length) ->
+ {ok, Payload} = gen_tcp:recv(Sock, Length, ?TIMEOUT),
+ parse(Payload, Client, FrameState, Length).
+
+parse(Payload, Client = {Sock, FramesRev}, FrameState, Length) ->
+ case rabbit_stomp_frame:parse(Payload, FrameState) of
+ {ok, Frame, <<>>} ->
+ recv({Sock, lists:reverse([Frame | FramesRev])});
+ {ok, Frame, <<"\n">>} ->
+ recv({Sock, lists:reverse([Frame | FramesRev])});
+ {ok, Frame, Rest} ->
+ parse(Rest, {Sock, [Frame | FramesRev]},
+ rabbit_stomp_frame:initial_state(), Length);
+ {more, NewState} ->
+ recv(Client, NewState, 0)
+ end.