summaryrefslogtreecommitdiff
path: root/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl
blob: db70c8e45f2910d05204d7b177bae6dd484f7ee7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2018-2023 VMware, Inc. or its affiliates.  All rights reserved.

-module(rabbit_message_interceptor_SUITE).

-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").

-compile([nowarn_export_all, export_all]).

-import(rabbit_ct_helpers, [eventually/1]).

all() ->
    [
     {group, tests}
    ].

groups() ->
    [
     {tests, [shuffle], [headers_overwrite,
                         headers_no_overwrite
                        ]}
    ].

init_per_suite(Config) ->
    rabbit_ct_helpers:log_environment(),
    rabbit_ct_helpers:run_setup_steps(Config).

end_per_suite(Config) ->
    rabbit_ct_helpers:run_teardown_steps(Config).

init_per_testcase(Testcase, Config0) ->
    Config1 = rabbit_ct_helpers:set_config(
                Config0, [{rmq_nodename_suffix, Testcase}]),
    Overwrite = case Testcase of
                    headers_overwrite -> true;
                    headers_no_overwrite -> false
                end,
    Val = maps:to_list(
            maps:from_keys([set_header_timestamp,
                            set_header_routing_node],
                           Overwrite)),
    Config = rabbit_ct_helpers:merge_app_env(
               Config1, {rabbit, [{incoming_message_interceptors, Val}]}),
    rabbit_ct_helpers:run_steps(
      Config,
      rabbit_ct_broker_helpers:setup_steps() ++
      rabbit_ct_client_helpers:setup_steps()).

end_per_testcase(Testcase, Config0) ->
    Config = rabbit_ct_helpers:testcase_finished(Config0, Testcase),
    rabbit_ct_helpers:run_teardown_steps(
      Config,
      rabbit_ct_client_helpers:teardown_steps() ++
      rabbit_ct_broker_helpers:teardown_steps()).

headers_overwrite(Config) ->
    headers(true, Config).

headers_no_overwrite(Config) ->
    headers(false, Config).

headers(Overwrite, Config) ->
    Server = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)),
    Payload = QName = atom_to_binary(?FUNCTION_NAME),
    NowSecs = os:system_time(second),
    NowMs = os:system_time(millisecond),
    Ch = rabbit_ct_client_helpers:open_channel(Config),
    #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
    amqp_channel:call(Ch, #'basic.publish'{routing_key = QName},
                      #amqp_msg{payload = Payload}),
    AssertHeaders =
    fun() ->
            eventually(
              ?_assertMatch(
                 {#'basic.get_ok'{},
                  #amqp_msg{payload = Payload,
                            props = #'P_basic'{
                                       timestamp = Secs,
                                       headers = [{<<"timestamp_in_ms">>, long, Ms},
                                                  {<<"x-routed-by">>, longstr, Server}]
                                      }}}
                   when Ms < NowMs + 4000 andalso
                        Ms > NowMs - 4000 andalso
                        Secs < NowSecs + 4 andalso
                        Secs > NowSecs - 4,
                 amqp_channel:call(Ch, #'basic.get'{queue = QName})))
    end,
    AssertHeaders(),

    Msg = #amqp_msg{payload = Payload,
                    props = #'P_basic'{
                               timestamp = 1,
                               headers = [{<<"timestamp_in_ms">>, long, 1000},
                                          {<<"x-routed-by">>, longstr, <<"rabbit@my-node">>}]
                              }},
    amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, Msg),
    case Overwrite of
        true ->
            AssertHeaders();
        false ->
            eventually(
              ?_assertMatch(
                 {#'basic.get_ok'{}, Msg},
                 amqp_channel:call(Ch, #'basic.get'{queue = QName})))
    end,

    #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
    ok.