summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_web_stomp/test/amqp_stomp_SUITE.erl
blob: 695d993149273cb7a3c6b412ae038e5fd775bc1b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates.  All rights reserved.
%%
-module(amqp_stomp_SUITE).

-compile(export_all).

-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").

-define(QUEUE, <<"TestQueue">>).
-define(DESTINATION, "/amq/queue/TestQueue").

all() ->
    [
    pubsub_amqp
    ].

init_per_suite(Config) ->
    Config1 = rabbit_ct_helpers:set_config(Config,
                                           [{rmq_nodename_suffix, ?MODULE},
                                            {protocol, "ws"}]),
    rabbit_ct_helpers:log_environment(),
    rabbit_ct_helpers:run_setup_steps(Config1,
        rabbit_ct_broker_helpers:setup_steps()).

end_per_suite(Config) ->
    rabbit_ct_helpers:run_teardown_steps(Config,
      rabbit_ct_broker_helpers:teardown_steps()).

init_per_testcase(Testcase, Config) ->
    Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
    {ok, Connection} = amqp_connection:start(#amqp_params_direct{
        node = rabbit_ct_broker_helpers:get_node_config(Config1, 0, nodename)
    }),
    {ok, Channel} = amqp_connection:open_channel(Connection),
    rabbit_ct_helpers:set_config(Config1, [
        {amqp_connection, Connection},
        {amqp_channel, Channel}
    ]).

end_per_testcase(Testcase, Config) ->
    Connection = ?config(amqp_connection, Config),
    Channel = ?config(amqp_channel, Config),
    amqp_channel:close(Channel),
    amqp_connection:close(Connection),
    rabbit_ct_helpers:testcase_finished(Config, Testcase).


raw_send(WS, Command, Headers) ->
    raw_send(WS, Command, Headers, <<>>).
raw_send(WS, Command, Headers, Body) ->
    Frame = stomp:marshal(Command, Headers, Body),
    rfc6455_client:send(WS, Frame).

raw_recv(WS) ->
    {ok, P} = rfc6455_client:recv(WS),
    stomp:unmarshal(P).


pubsub_amqp(Config) ->
    Ch = ?config(amqp_channel, Config),
    #'queue.declare_ok'{} =
        amqp_channel:call(Ch, #'queue.declare'{queue = ?QUEUE, auto_delete = true}),

    PortStr = rabbit_ws_test_util:get_web_stomp_port_str(Config),
    Protocol = ?config(protocol, Config),
    WS = rfc6455_client:new(Protocol ++ "://127.0.0.1:" ++ PortStr ++ "/ws", self()),
    {ok, _} = rfc6455_client:open(WS),
    ok = raw_send(WS, "CONNECT", [{"login", "guest"}, {"passcode", "guest"}]),

    {<<"CONNECTED">>, _, <<>>} = raw_recv(WS),

    ok = raw_send(WS, "SUBSCRIBE", [{"destination", ?DESTINATION},
                                    {"id", "pubsub_amqp"},
                                    {"x-queue-name", ?QUEUE}]),

    CHK1 = <<"x-custom-hdr-1">>,
    CHV1 = <<"value1">>,
    CH1 = {CHK1, longstr, CHV1},
    CHK2 = <<"x-custom-hdr-2">>,
    CHV2 = <<"value2">>,
    CH2 = {CHK2, longstr, CHV2},
    CHK3 = <<"custom-hdr-3">>,
    CHV3 = <<"value3">>,
    CH3 = {CHK3, longstr, <<"value3">>},

    Publish = #'basic.publish'{exchange = <<"">>, routing_key = ?QUEUE},
    Props = #'P_basic'{headers = [CH1, CH2, CH3]},
    amqp_channel:call(Ch, Publish, #amqp_msg{props = Props, payload = <<"a\x00a">>}),

    {<<"MESSAGE">>, H, <<"a\x00a">>} = raw_recv(WS),

    {close, _} = rfc6455_client:close(WS),

    "/queue/TestQueue" = binary_to_list(proplists:get_value(<<"destination">>, H)),
    {CHK1, CHV1} = {CHK1, proplists:get_value(CHK1, H)},
    {CHK2, CHV2} = {CHK2, proplists:get_value(CHK2, H)},
    {CHK3, CHV3} = {CHK3, proplists:get_value(CHK3, H)},
    ok.