diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2020-01-30 13:13:26 +0100 |
---|---|---|
committer | dcorbacho <dparracorbacho@piotal.io> | 2020-01-30 13:13:26 +0100 |
commit | 5591c6fb783f3feac42f8d8ba854b094cc9b2cbb (patch) | |
tree | 83c7ce59254aa780fc076d3d266d10d827a569d8 | |
parent | b393653c34d218c1bfbe6d99a0a7b395149858aa (diff) | |
download | rabbitmq-server-git-5591c6fb783f3feac42f8d8ba854b094cc9b2cbb.tar.gz |
Test streaming after recovery
-rw-r--r-- | test/rabbit_stream2_queue_SUITE.erl | 81 |
1 files changed, 74 insertions, 7 deletions
diff --git a/test/rabbit_stream2_queue_SUITE.erl b/test/rabbit_stream2_queue_SUITE.erl index 3e09d56c6f..8376cdbc13 100644 --- a/test/rabbit_stream2_queue_SUITE.erl +++ b/test/rabbit_stream2_queue_SUITE.erl @@ -50,7 +50,8 @@ all_tests() -> roundtrip, time_travel, idempotent_declare_queue, - delete_queue + delete_queue, + zenflix ]. %% ------------------------------------------------------------------- @@ -134,7 +135,7 @@ merge_app_env(Config) -> {ra, [{min_wal_roll_over_interval, 30000}]}). end_per_testcase(Testcase, Config) -> - rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), + %% rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), Config1 = rabbit_ct_helpers:run_steps( Config, rabbit_ct_client_helpers:teardown_steps()), @@ -361,6 +362,45 @@ cluster_delete_queue(Config) -> flush(100), ok. +zenflix(Config) -> + %% Let's stream our own movie, no sequence should be out of order! + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), + QName = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch1, QName, [{<<"x-queue-type">>, longstr, + ?config(queue_type, Config)}])), + + publish_stream(Ch1, QName, 1, 100), + + consume_stream(<<"ctag1">>, Ch2, QName, 0, 100), + consume_stream(<<"ctag2">>, Ch2, QName, 35, 100), + + [rabbit_ct_broker_helpers:stop_node(Config, S) || S <- Servers], + [rabbit_ct_broker_helpers:start_node(Config, S) || S <- lists:reverse(Servers)], + + Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server), + Ch4 = rabbit_ct_client_helpers:open_channel(Config, Server), + + consume_stream(<<"ctag3">>, Ch3, QName, 0, 100), + consume_stream(<<"ctag4">>, Ch3, QName, 98, 100), + consume_stream(<<"ctag5">>, Ch3, QName, 35, 100), + consume_stream(<<"ctag6">>, Ch3, QName, 10, 100), + consume_stream(<<"ctag7">>, Ch3, QName, 89, 100), + consume_stream(<<"ctag8">>, Ch3, QName, 100, 100), + + publish_stream(Ch4, QName, 101, 200), + + consume_stream(<<"ctag8">>, Ch3, QName, 75, 200), + + flush(100), + ok. + +%% HELPERS + assert_declare(Ch, QName, Type) -> ?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [{<<"x-queue-type">>, longstr, Type}])), @@ -380,19 +420,17 @@ assert_deleted(Config, StreamsDir1, StreamsDir2, StreamsDir3) -> ?assertMatch({ok, []}, file:list_dir(StreamsDir2)), ?assertMatch({ok, []}, file:list_dir(StreamsDir3)). -%% HELPERS - publish_confirm(Ch, QName, Msg) -> publish(Ch, QName, Msg), amqp_channel:register_confirm_handler(Ch, self()), - ct:pal("waiting for confirms from ~s", [QName]), + %% ct:pal("waiting for confirms from ~s", [QName]), ok = receive #'basic.ack'{} -> ok; #'basic.nack'{} -> fail after 2500 -> - exit(confirm_timeout) + exit({confirm_timeout, Msg}) end, - ct:pal("CONFIRMED! ~s", [QName]), + %% ct:pal("CONFIRMED! ~s", [QName]), ok. publish_many(Ch, Queue, Count) -> @@ -443,3 +481,32 @@ flush(T) -> delete_queues() -> [rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) || Q <- rabbit_amqqueue:list()]. + +publish_stream(Ch, QName, From, To) -> + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + [begin + Msg = <<S:64>>, + publish_confirm(Ch, QName, Msg) + end || S <- lists:seq(From, To)]. + +consume_stream(CTag, Ch, QName, Offset, Upto) -> + subscribe(Ch, CTag, QName, 1, [{<<"x-stream-offset">>, long, Offset}]), + receive_stream(Ch, CTag, Offset + 1, Upto + 1), + cancel(Ch, CTag). + +receive_stream(_, _, From, From) -> + ok; +receive_stream(Ch, CTag, From, To) -> + Msg = <<From:64>>, + receive + {#'basic.deliver'{delivery_tag = DT, + consumer_tag = CTag, + redelivered = false}, + #amqp_msg{payload = Msg}} -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DT, + multiple = false}), + receive_stream(Ch, CTag, From + 1, To) + after 2000 -> + flush(100), + exit({receive_stream_timeout, CTag, From}) + end. |