summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2020-01-30 13:13:26 +0100
committerdcorbacho <dparracorbacho@piotal.io>2020-01-30 13:13:26 +0100
commit5591c6fb783f3feac42f8d8ba854b094cc9b2cbb (patch)
tree83c7ce59254aa780fc076d3d266d10d827a569d8
parentb393653c34d218c1bfbe6d99a0a7b395149858aa (diff)
downloadrabbitmq-server-git-5591c6fb783f3feac42f8d8ba854b094cc9b2cbb.tar.gz
Test streaming after recovery
-rw-r--r--test/rabbit_stream2_queue_SUITE.erl81
1 files changed, 74 insertions, 7 deletions
diff --git a/test/rabbit_stream2_queue_SUITE.erl b/test/rabbit_stream2_queue_SUITE.erl
index 3e09d56c6f..8376cdbc13 100644
--- a/test/rabbit_stream2_queue_SUITE.erl
+++ b/test/rabbit_stream2_queue_SUITE.erl
@@ -50,7 +50,8 @@ all_tests() ->
roundtrip,
time_travel,
idempotent_declare_queue,
- delete_queue
+ delete_queue,
+ zenflix
].
%% -------------------------------------------------------------------
@@ -134,7 +135,7 @@ merge_app_env(Config) ->
{ra, [{min_wal_roll_over_interval, 30000}]}).
end_per_testcase(Testcase, Config) ->
- rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
+ %% rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
Config1 = rabbit_ct_helpers:run_steps(
Config,
rabbit_ct_client_helpers:teardown_steps()),
@@ -361,6 +362,45 @@ cluster_delete_queue(Config) ->
flush(100),
ok.
+zenflix(Config) ->
+ %% Let's stream our own movie, no sequence should be out of order!
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QName = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', QName, 0, 0},
+ declare(Ch1, QName, [{<<"x-queue-type">>, longstr,
+ ?config(queue_type, Config)}])),
+
+ publish_stream(Ch1, QName, 1, 100),
+
+ consume_stream(<<"ctag1">>, Ch2, QName, 0, 100),
+ consume_stream(<<"ctag2">>, Ch2, QName, 35, 100),
+
+ [rabbit_ct_broker_helpers:stop_node(Config, S) || S <- Servers],
+ [rabbit_ct_broker_helpers:start_node(Config, S) || S <- lists:reverse(Servers)],
+
+ Ch3 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Ch4 = rabbit_ct_client_helpers:open_channel(Config, Server),
+
+ consume_stream(<<"ctag3">>, Ch3, QName, 0, 100),
+ consume_stream(<<"ctag4">>, Ch3, QName, 98, 100),
+ consume_stream(<<"ctag5">>, Ch3, QName, 35, 100),
+ consume_stream(<<"ctag6">>, Ch3, QName, 10, 100),
+ consume_stream(<<"ctag7">>, Ch3, QName, 89, 100),
+ consume_stream(<<"ctag8">>, Ch3, QName, 100, 100),
+
+ publish_stream(Ch4, QName, 101, 200),
+
+ consume_stream(<<"ctag8">>, Ch3, QName, 75, 200),
+
+ flush(100),
+ ok.
+
+%% HELPERS
+
assert_declare(Ch, QName, Type) ->
?assertEqual({'queue.declare_ok', QName, 0, 0},
declare(Ch, QName, [{<<"x-queue-type">>, longstr, Type}])),
@@ -380,19 +420,17 @@ assert_deleted(Config, StreamsDir1, StreamsDir2, StreamsDir3) ->
?assertMatch({ok, []}, file:list_dir(StreamsDir2)),
?assertMatch({ok, []}, file:list_dir(StreamsDir3)).
-%% HELPERS
-
publish_confirm(Ch, QName, Msg) ->
publish(Ch, QName, Msg),
amqp_channel:register_confirm_handler(Ch, self()),
- ct:pal("waiting for confirms from ~s", [QName]),
+ %% ct:pal("waiting for confirms from ~s", [QName]),
ok = receive
#'basic.ack'{} -> ok;
#'basic.nack'{} -> fail
after 2500 ->
- exit(confirm_timeout)
+ exit({confirm_timeout, Msg})
end,
- ct:pal("CONFIRMED! ~s", [QName]),
+ %% ct:pal("CONFIRMED! ~s", [QName]),
ok.
publish_many(Ch, Queue, Count) ->
@@ -443,3 +481,32 @@ flush(T) ->
delete_queues() ->
[rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
|| Q <- rabbit_amqqueue:list()].
+
+publish_stream(Ch, QName, From, To) ->
+ #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
+ [begin
+ Msg = <<S:64>>,
+ publish_confirm(Ch, QName, Msg)
+ end || S <- lists:seq(From, To)].
+
+consume_stream(CTag, Ch, QName, Offset, Upto) ->
+ subscribe(Ch, CTag, QName, 1, [{<<"x-stream-offset">>, long, Offset}]),
+ receive_stream(Ch, CTag, Offset + 1, Upto + 1),
+ cancel(Ch, CTag).
+
+receive_stream(_, _, From, From) ->
+ ok;
+receive_stream(Ch, CTag, From, To) ->
+ Msg = <<From:64>>,
+ receive
+ {#'basic.deliver'{delivery_tag = DT,
+ consumer_tag = CTag,
+ redelivered = false},
+ #amqp_msg{payload = Msg}} ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DT,
+ multiple = false}),
+ receive_stream(Ch, CTag, From + 1, To)
+ after 2000 ->
+ flush(100),
+ exit({receive_stream_timeout, CTag, From})
+ end.