diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2020-01-28 13:29:59 +0100 |
---|---|---|
committer | dcorbacho <dparracorbacho@piotal.io> | 2020-01-28 15:35:02 +0100 |
commit | b393653c34d218c1bfbe6d99a0a7b395149858aa (patch) | |
tree | 6216910c5b26668021f4327b4d450c681a77e9b1 | |
parent | 3418b10b3623c7edc9d9ea03969886bd5ffc3222 (diff) | |
download | rabbitmq-server-git-b393653c34d218c1bfbe6d99a0a7b395149858aa.tar.gz |
Fixes and tests
-rw-r--r-- | src/rabbit_channel.erl | 4 | ||||
-rw-r--r-- | src/rabbit_stream2_queue.erl | 3 | ||||
-rw-r--r-- | test/rabbit_stream2_queue_SUITE.erl | 52 |
3 files changed, 52 insertions, 7 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 92855bfe3a..2e930e1321 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -2792,7 +2792,7 @@ handle_stream_deliveries(ConsumerTag, QPid, QName, MsgBins, lists:foldl( fun ({_, _, Offs, _, #basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], - content = Content}}, {DTag, Bins, UAMQ}) -> + content = Content}}, {DTag, UAMQ}) -> Deliver = #'basic.deliver'{consumer_tag = ConsumerTag, delivery_tag = DTag, redelivered = false, @@ -2803,7 +2803,7 @@ handle_stream_deliveries(ConsumerTag, QPid, QName, MsgBins, {DTag +1, ?QUEUE:in({DTag, ConsumerTag, DeliveredAt, {QPid, QName, Offs}}, UAMQ)} - end, {NextTag0, [], UAMQ0}, MsgBins), + end, {NextTag0, UAMQ0}, MsgBins), % rabbit_log:info("handle stream deliveries next tag ~w", [NextTag]), ?INCR_STATS(queue_stats, QName, NextTag - NextTag0, deliver, State), State#ch{next_tag = NextTag, diff --git a/src/rabbit_stream2_queue.erl b/src/rabbit_stream2_queue.erl index f90c7bb42f..1d4530226b 100644 --- a/src/rabbit_stream2_queue.erl +++ b/src/rabbit_stream2_queue.erl @@ -237,7 +237,8 @@ declare(Q0) -> Opts = amqqueue:get_options(Q0), ActingUser = maps:get(user, Opts, ?UNKNOWN_USER), Replicas = rabbit_mnesia:cluster_nodes(all) -- [node()], - N = ra_lib:derive_safe_string(atom_to_list(Name), 8), + LName = atom_to_list(Name), + N = ra_lib:derive_safe_string(LName, length(LName)), Conf = #{reference => QName, name => list_to_atom(N)}, {ok, LeaderPid, ReplicaPids} = osiris:start_cluster(N, Replicas, Conf), diff --git a/test/rabbit_stream2_queue_SUITE.erl b/test/rabbit_stream2_queue_SUITE.erl index 6b90a306e1..3e09d56c6f 100644 --- a/test/rabbit_stream2_queue_SUITE.erl +++ b/test/rabbit_stream2_queue_SUITE.erl @@ -49,6 +49,7 @@ all_tests() -> [ roundtrip, time_travel, + idempotent_declare_queue, delete_queue ]. @@ -242,6 +243,30 @@ time_travel(Config) -> end, ok. +idempotent_declare_queue(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [MnesiaDir | _] = rabbit_ct_broker_helpers:get_node_configs(Config, mnesia_dir), + + StreamsDir = filename:join(MnesiaDir, "streams"), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + + QName = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, + ?config(queue_type, Config)}])), + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, + ?config(queue_type, Config)}])), + + ?assertEqual([[QName]], lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, ["list_queues", "name", "--no-table-headers"]))), + ?assertMatch({ok, [_]}, file:list_dir(StreamsDir)), + + flush(100), + ok. + delete_queue(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), [MnesiaDir | _] = rabbit_ct_broker_helpers:get_node_configs(Config, mnesia_dir), @@ -249,20 +274,39 @@ delete_queue(Config) -> StreamsDir = filename:join(MnesiaDir, "streams"), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QName = ?config(queue_name, Config), + QName2 = ?config(alt_queue_name, Config), + + %% Let's declare two queues ?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [{<<"x-queue-type">>, longstr, ?config(queue_type, Config)}])), + ?assertEqual({'queue.declare_ok', QName2, 0, 0}, + declare(Ch, QName2, [{<<"x-queue-type">>, longstr, + ?config(queue_type, Config)}])), publish_many(Ch, QName, 100), + publish_many(Ch, QName2, 100), - ?assertEqual([[QName]], rabbit_ct_broker_helpers:rabbitmqctl_list( - Config, 0, ["list_queues", "name", "--no-table-headers"])), - ?assertMatch({ok, [_]}, file:list_dir(StreamsDir)), + %% Check the queues are listed and have a data directory + Queues = lists:sort([[QName], [QName2]]), + ?assertEqual(Queues, lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, ["list_queues", "name", "--no-table-headers"]))), + ?assertMatch({ok, [_, _]}, file:list_dir(StreamsDir)), + %% Delete one of the queues ?assertEqual({'queue.delete_ok', 0}, amqp_channel:call(Ch, #'queue.delete'{queue = QName})), - ?assertEqual([], rabbit_ct_broker_helpers:rabbitmqctl_list( + ?assertEqual([[QName2]], rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, ["list_queues", "name", "--no-table-headers"])), + ?assertMatch({ok, [_]}, file:list_dir(StreamsDir)), + + %% Delete the other queue + ?assertEqual({'queue.delete_ok', 0}, + amqp_channel:call(Ch, #'queue.delete'{queue = QName2})), + + ?assertEqual([], rabbit_ct_broker_helpers:rabbitmqctl_list( Config, 0, ["list_queues", "name", "--no-table-headers"])), ?assertMatch({ok, []}, file:list_dir(StreamsDir)), |