summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_stomp/test/src/rabbit_stomp_client.erl
blob: 739512e3b3a8144fa9b94a347641b791146931a7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates.  All rights reserved.
%%

%% The stupidest client imaginable, just for testing.

-module(rabbit_stomp_client).

-export([connect/1, connect/2, connect/4, connect/5, disconnect/1, send/2, send/3, send/4, recv/1]).

-include("rabbit_stomp_frame.hrl").

-define(TIMEOUT, 1000). % milliseconds

connect(Port)  -> connect0([], "guest", "guest", Port, []).
connect(V, Port) -> connect0([{"accept-version", V}], "guest", "guest", Port, []).
connect(V, Login, Pass, Port) -> connect0([{"accept-version", V}], Login, Pass, Port, []).
connect(V, Login, Pass, Port, Headers) -> connect0([{"accept-version", V}], Login, Pass, Port, Headers).

connect0(Version, Login, Pass, Port, Headers) ->
    %% The default port is 61613 but it's in the middle of the ephemeral
    %% ports range on many operating systems. Therefore, there is a
    %% chance this port is already in use. Let's use a port close to the
    %% AMQP default port.
    {ok, Sock} = gen_tcp:connect(localhost, Port, [{active, false}, binary]),
    Client0 = recv_state(Sock),
    send(Client0, "CONNECT", [{"login", Login},
                              {"passcode", Pass} | Version] ++ Headers),
    {#stomp_frame{command = "CONNECTED"}, Client1} = recv(Client0),
    {ok, Client1}.

disconnect(Client = {Sock, _}) ->
    send(Client, "DISCONNECT"),
    gen_tcp:close(Sock).

send(Client, Command) ->
    send(Client, Command, []).

send(Client, Command, Headers) ->
    send(Client, Command, Headers, []).

send({Sock, _}, Command, Headers, Body) ->
    Frame = rabbit_stomp_frame:serialize(
              #stomp_frame{command     = list_to_binary(Command),
                           headers     = Headers,
                           body_iolist = Body}),
    gen_tcp:send(Sock, Frame).

recv_state(Sock) ->
    {Sock, []}.

recv({_Sock, []} = Client) ->
    recv(Client, rabbit_stomp_frame:initial_state(), 0);
recv({Sock, [Frame | Frames]}) ->
    {Frame, {Sock, Frames}}.

recv(Client = {Sock, _}, FrameState, Length) ->
    {ok, Payload} = gen_tcp:recv(Sock, Length, ?TIMEOUT),
    parse(Payload, Client, FrameState, Length).

parse(Payload, Client = {Sock, FramesRev}, FrameState, Length) ->
    case rabbit_stomp_frame:parse(Payload, FrameState) of
        {ok, Frame, <<>>} ->
            recv({Sock, lists:reverse([Frame | FramesRev])});
        {ok, Frame, <<"\n">>} ->
            recv({Sock, lists:reverse([Frame | FramesRev])});
        {ok, Frame, Rest} ->
            parse(Rest, {Sock, [Frame | FramesRev]},
                  rabbit_stomp_frame:initial_state(), Length);
        {more, NewState} ->
            recv(Client, NewState, 0)
    end.