summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2020-01-28 13:29:59 +0100
committerdcorbacho <dparracorbacho@piotal.io>2020-01-28 15:35:02 +0100
commitb393653c34d218c1bfbe6d99a0a7b395149858aa (patch)
tree6216910c5b26668021f4327b4d450c681a77e9b1
parent3418b10b3623c7edc9d9ea03969886bd5ffc3222 (diff)
downloadrabbitmq-server-git-b393653c34d218c1bfbe6d99a0a7b395149858aa.tar.gz
Fixes and tests
-rw-r--r--src/rabbit_channel.erl4
-rw-r--r--src/rabbit_stream2_queue.erl3
-rw-r--r--test/rabbit_stream2_queue_SUITE.erl52
3 files changed, 52 insertions, 7 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 92855bfe3a..2e930e1321 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -2792,7 +2792,7 @@ handle_stream_deliveries(ConsumerTag, QPid, QName, MsgBins,
lists:foldl(
fun ({_, _, Offs, _, #basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
- content = Content}}, {DTag, Bins, UAMQ}) ->
+ content = Content}}, {DTag, UAMQ}) ->
Deliver = #'basic.deliver'{consumer_tag = ConsumerTag,
delivery_tag = DTag,
redelivered = false,
@@ -2803,7 +2803,7 @@ handle_stream_deliveries(ConsumerTag, QPid, QName, MsgBins,
{DTag +1,
?QUEUE:in({DTag, ConsumerTag, DeliveredAt,
{QPid, QName, Offs}}, UAMQ)}
- end, {NextTag0, [], UAMQ0}, MsgBins),
+ end, {NextTag0, UAMQ0}, MsgBins),
% rabbit_log:info("handle stream deliveries next tag ~w", [NextTag]),
?INCR_STATS(queue_stats, QName, NextTag - NextTag0, deliver, State),
State#ch{next_tag = NextTag,
diff --git a/src/rabbit_stream2_queue.erl b/src/rabbit_stream2_queue.erl
index f90c7bb42f..1d4530226b 100644
--- a/src/rabbit_stream2_queue.erl
+++ b/src/rabbit_stream2_queue.erl
@@ -237,7 +237,8 @@ declare(Q0) ->
Opts = amqqueue:get_options(Q0),
ActingUser = maps:get(user, Opts, ?UNKNOWN_USER),
Replicas = rabbit_mnesia:cluster_nodes(all) -- [node()],
- N = ra_lib:derive_safe_string(atom_to_list(Name), 8),
+ LName = atom_to_list(Name),
+ N = ra_lib:derive_safe_string(LName, length(LName)),
Conf = #{reference => QName,
name => list_to_atom(N)},
{ok, LeaderPid, ReplicaPids} = osiris:start_cluster(N, Replicas, Conf),
diff --git a/test/rabbit_stream2_queue_SUITE.erl b/test/rabbit_stream2_queue_SUITE.erl
index 6b90a306e1..3e09d56c6f 100644
--- a/test/rabbit_stream2_queue_SUITE.erl
+++ b/test/rabbit_stream2_queue_SUITE.erl
@@ -49,6 +49,7 @@ all_tests() ->
[
roundtrip,
time_travel,
+ idempotent_declare_queue,
delete_queue
].
@@ -242,6 +243,30 @@ time_travel(Config) ->
end,
ok.
+idempotent_declare_queue(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [MnesiaDir | _] = rabbit_ct_broker_helpers:get_node_configs(Config, mnesia_dir),
+
+ StreamsDir = filename:join(MnesiaDir, "streams"),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+
+ QName = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', QName, 0, 0},
+ declare(Ch, QName, [{<<"x-queue-type">>, longstr,
+ ?config(queue_type, Config)}])),
+ ?assertEqual({'queue.declare_ok', QName, 0, 0},
+ declare(Ch, QName, [{<<"x-queue-type">>, longstr,
+ ?config(queue_type, Config)}])),
+
+ ?assertEqual([[QName]], lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(
+ Config, 0, ["list_queues", "name", "--no-table-headers"]))),
+ ?assertMatch({ok, [_]}, file:list_dir(StreamsDir)),
+
+ flush(100),
+ ok.
+
delete_queue(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
[MnesiaDir | _] = rabbit_ct_broker_helpers:get_node_configs(Config, mnesia_dir),
@@ -249,20 +274,39 @@ delete_queue(Config) ->
StreamsDir = filename:join(MnesiaDir, "streams"),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+
QName = ?config(queue_name, Config),
+ QName2 = ?config(alt_queue_name, Config),
+
+ %% Let's declare two queues
?assertEqual({'queue.declare_ok', QName, 0, 0},
declare(Ch, QName, [{<<"x-queue-type">>, longstr,
?config(queue_type, Config)}])),
+ ?assertEqual({'queue.declare_ok', QName2, 0, 0},
+ declare(Ch, QName2, [{<<"x-queue-type">>, longstr,
+ ?config(queue_type, Config)}])),
publish_many(Ch, QName, 100),
+ publish_many(Ch, QName2, 100),
- ?assertEqual([[QName]], rabbit_ct_broker_helpers:rabbitmqctl_list(
- Config, 0, ["list_queues", "name", "--no-table-headers"])),
- ?assertMatch({ok, [_]}, file:list_dir(StreamsDir)),
+ %% Check the queues are listed and have a data directory
+ Queues = lists:sort([[QName], [QName2]]),
+ ?assertEqual(Queues, lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(
+ Config, 0, ["list_queues", "name", "--no-table-headers"]))),
+ ?assertMatch({ok, [_, _]}, file:list_dir(StreamsDir)),
+ %% Delete one of the queues
?assertEqual({'queue.delete_ok', 0},
amqp_channel:call(Ch, #'queue.delete'{queue = QName})),
- ?assertEqual([], rabbit_ct_broker_helpers:rabbitmqctl_list(
+ ?assertEqual([[QName2]], rabbit_ct_broker_helpers:rabbitmqctl_list(
+ Config, 0, ["list_queues", "name", "--no-table-headers"])),
+ ?assertMatch({ok, [_]}, file:list_dir(StreamsDir)),
+
+ %% Delete the other queue
+ ?assertEqual({'queue.delete_ok', 0},
+ amqp_channel:call(Ch, #'queue.delete'{queue = QName2})),
+
+ ?assertEqual([], rabbit_ct_broker_helpers:rabbitmqctl_list(
Config, 0, ["list_queues", "name", "--no-table-headers"])),
?assertMatch({ok, []}, file:list_dir(StreamsDir)),