summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-10-07 14:03:31 +0100
committerKarl Nilsson <kjnilsson@gmail.com>2021-10-11 13:32:38 +0100
commit92f83a1af091cb0d2e1344efac02cd9286c40920 (patch)
tree7f56c35db7aed5bedc53989ff1aa0903df3543b0
parent49a47586b075c19e5af521a3f1d333ed8fab4654 (diff)
downloadrabbitmq-server-git-92f83a1af091cb0d2e1344efac02cd9286c40920.tar.gz
Improve consuming from a stream using AMQP 1.0
Allow an offset spec to be used to attach at an appropriate point in the stream. This is done by specifying a source filter with the key rabbitmq:stream-offset-spec. The offset is also included as a message annotation with the key x-stream-offset. When a link is detached we also issue a basic.cancel to the 0.9.1 channel. If this wasn't done and you detached then re-attached a link for the same queue you'd get a consumer-tag offset error from the 0.9.1 channel.
-rw-r--r--deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl27
-rw-r--r--deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl28
-rw-r--r--deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl17
-rw-r--r--deps/rabbitmq_amqp1_0/test/system_SUITE.erl19
-rwxr-xr-xdeps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs44
-rw-r--r--deps/rabbitmq_codegen/.DS_Storebin0 -> 6148 bytes
6 files changed, 127 insertions, 8 deletions
diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl
index 42248ce0ab..a3eccc7587 100644
--- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl
+++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl
@@ -12,6 +12,7 @@
-define(PROPERTIES_HEADER, <<"x-amqp-1.0-properties">>).
-define(APP_PROPERTIES_HEADER, <<"x-amqp-1.0-app-properties">>).
-define(MESSAGE_ANNOTATIONS_HEADER, <<"x-amqp-1.0-message-annotations">>).
+-define(STREAM_OFFSET_HEADER, <<"x-stream-offset">>).
-define(FOOTER, <<"x-amqp-1.0-footer">>).
-include_lib("amqp_client/include/amqp_client.hrl").
@@ -201,11 +202,29 @@ annotated_message(RKey, #'basic.deliver'{redelivered = Redelivered},
first_acquirer = not Redelivered,
delivery_count = undefined},
HeadersBin = amqp10_framing:encode_bin(Header10),
- MsgAnnoBin =
+ MsgAnnoBin0 =
case table_lookup(Headers, ?MESSAGE_ANNOTATIONS_HEADER) of
undefined -> <<>>;
{_, MABin} -> MABin
- end,
+ end,
+ MsgAnnoBin =
+ case table_lookup(Headers, ?STREAM_OFFSET_HEADER) of
+ undefined ->
+ MsgAnnoBin0;
+ {_, StreamOffset} when is_integer(StreamOffset) ->
+ case amqp10_framing:decode_bin(MsgAnnoBin0) of
+ [#'v1_0.message_annotations'{content = C0} = MA] ->
+ Contents = map_add(utf8, ?STREAM_OFFSET_HEADER,
+ ulong, StreamOffset, C0),
+ amqp10_framing:encode_bin(
+ MA#'v1_0.message_annotations'{content = Contents});
+ [] ->
+ Contents = map_add(utf8, ?STREAM_OFFSET_HEADER,
+ ulong, StreamOffset, []),
+ amqp10_framing:encode_bin(
+ #'v1_0.message_annotations'{content = Contents})
+ end
+ end,
PropsBin =
case table_lookup(Headers, ?PROPERTIES_HEADER) of
{_, Props10Bin} ->
@@ -259,3 +278,7 @@ wrap(Type, Val) ->
table_lookup(undefined, _) -> undefined;
table_lookup(Headers, Header) -> rabbit_misc:table_lookup(Headers, Header).
+map_add(_T, _Key, _Type, undefined, Acc) ->
+ Acc;
+map_add(KeyType, Key, Type, Value, Acc) ->
+ [{wrap(KeyType, Key), wrap(Type, Value)} | Acc].
diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl
index 3e58a23b76..2c8da853c7 100644
--- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl
+++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl
@@ -7,7 +7,7 @@
-module(rabbit_amqp1_0_outgoing_link).
--export([attach/3, delivery/6, transferred/3, credit_drained/3, flow/3]).
+-export([attach/3, detach/3, delivery/6, transferred/3, credit_drained/3, flow/3]).
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_amqp1_0.hrl").
@@ -24,6 +24,11 @@
default_outcome,
route_state}).
+detach(#'v1_0.detach'{handle = Handle}, BCh,_Link) ->
+ CTag = handle_to_ctag(Handle),
+ rabbit_amqp1_0_channel:call(BCh, #'basic.cancel'{consumer_tag = CTag}),
+ ok.
+
attach(#'v1_0.attach'{name = Name,
handle = Handle,
source = Source,
@@ -46,6 +51,8 @@ attach(#'v1_0.attach'{name = Name,
DCh) of
{ok, Source1, OutgoingLink = #outgoing_link{queue = QueueName}} ->
CTag = handle_to_ctag(Handle),
+ Args = source_filters_to_consumer_args(Source1),
+
case rabbit_amqp1_0_channel:subscribe(
BCh, #'basic.consume'{
queue = QueueName,
@@ -56,7 +63,8 @@ attach(#'v1_0.attach'{name = Name,
no_ack = false,
%% TODO exclusive?
exclusive = false,
- arguments = [{<<"x-credit">>, table,
+ arguments = Args ++
+ [{<<"x-credit">>, table,
[{<<"credit">>, long, 0},
{<<"drain">>, bool, false}]}]},
self()) of
@@ -140,6 +148,7 @@ default(Thing, _Default) -> Thing.
ensure_source(Source = #'v1_0.source'{address = Address,
dynamic = Dynamic,
durable = Durable,
+ filter = _Filters,
%% TODO
expiry_policy = _ExpiryPolicy,
%% TODO
@@ -238,3 +247,18 @@ transferred(DeliveryTag, Channel,
ok
end,
Link#outgoing_link{delivery_count = serial_add(Count, 1)}.
+
+source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) ->
+ Key = {symbol, <<"rabbitmq:stream-offset-spec">>},
+ case lists:keyfind(Key, 1, KVList) of
+ {_, {timestamp, Ts}} ->
+ [{<<"x-stream-offset">>, timestamp, Ts div 1000}]; %% 0.9.1 uses second based timestamps
+ {_, {utf8, Spec}} ->
+ [{<<"x-stream-offset">>, longstr, Spec}]; %% next, last, first and "10m" etc
+ {_, {_, Offset}} when is_integer(Offset) ->
+ [{<<"x-stream-offset">>, long, Offset}]; %% integer offset
+ _ ->
+ []
+ end;
+source_filters_to_consumer_args(_Source) ->
+ [].
diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl
index d1899ece06..b2a466785b 100644
--- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl
+++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl
@@ -85,6 +85,11 @@ handle_info(#'basic.consume_ok'{}, State) ->
%% Handled above
{noreply, State};
+handle_info(#'basic.cancel_ok'{}, State) ->
+ %% just ignore this for now,
+ %% At some point we should send the detach here but then we'd need to track
+ %% consumer tags -> link handle somewhere
+ {noreply, State};
handle_info({#'basic.deliver'{ consumer_tag = ConsumerTag,
delivery_tag = DeliveryTag } = Deliver, Msg},
State = #state{frame_max = FrameMax,
@@ -294,11 +299,19 @@ handle_control(#'v1_0.disposition'{state = Outcome,
{Reply, Session1} -> {reply, Reply, state(Session1, State)}
end;
-handle_control(#'v1_0.detach'{ handle = Handle }, State) ->
+handle_control(#'v1_0.detach'{handle = Handle} = Detach,
+ #state{backing_channel = BCh} = State) ->
%% TODO keep the state around depending on the lifetime
%% TODO outgoing links?
+ case get({out, Handle}) of
+ undefined ->
+ ok;
+ Link ->
+ erase({out, Handle}),
+ ok = rabbit_amqp1_0_outgoing_link:detach(Detach, BCh, Link)
+ end,
erase({in, Handle}),
- {reply, #'v1_0.detach'{ handle = Handle }, State};
+ {reply, #'v1_0.detach'{handle = Handle}, State};
handle_control(#'v1_0.end'{}, _State = #state{ writer_pid = Sock }) ->
ok = rabbit_amqp1_0_writer:send_command(Sock, #'v1_0.end'{}),
diff --git a/deps/rabbitmq_amqp1_0/test/system_SUITE.erl b/deps/rabbitmq_amqp1_0/test/system_SUITE.erl
index bf4117fbbc..24f63a267a 100644
--- a/deps/rabbitmq_amqp1_0/test/system_SUITE.erl
+++ b/deps/rabbitmq_amqp1_0/test/system_SUITE.erl
@@ -37,12 +37,15 @@ groups() ->
auth_failure,
access_failure,
access_failure_not_allowed,
- access_failure_send
+ access_failure_send,
+ streams
]},
{java, [], [
roundtrip
]},
- {streams, [], [
+ {streams, [
+ streams
+ ], [
]}
].
@@ -135,6 +138,15 @@ roundtrip(Config) ->
{java, "RoundTripTest"}
]).
+streams(Config) ->
+ Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.declare'{queue = <<"stream_q">>,
+ durable = true,
+ arguments = [{<<"x-queue-type">>, longstr, "stream"}]}),
+ run(Config, [
+ {dotnet, "streams"}
+ ]).
+
roundtrip_to_amqp_091(Config) ->
run(Config, [
{dotnet, "roundtrip_to_amqp_091"}
@@ -204,6 +216,9 @@ routing(Config) ->
amqp_channel:call(Ch, #'queue.declare'{queue = <<"stream_q">>,
durable = true,
arguments = [{<<"x-queue-type">>, longstr, StreamQT}]}),
+ amqp_channel:call(Ch, #'queue.declare'{queue = <<"stream_q2">>,
+ durable = true,
+ arguments = [{<<"x-queue-type">>, longstr, StreamQT}]}),
amqp_channel:call(Ch, #'queue.declare'{queue = <<"autodel_q">>,
auto_delete = true}),
run(Config, [
diff --git a/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs b/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs
index f91f51c5fc..6f083c2309 100755
--- a/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs
+++ b/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs
@@ -75,6 +75,10 @@ module Test =
if a <> b then
failwith (sprintf "Expected: %A\r\nGot: %A" a b)
+ let assertNotNull a =
+ if a = null then
+ failwith (sprintf "Null not expected")
+
let assertTrue b =
if not b then
failwith (sprintf "Expected True got False!")
@@ -142,6 +146,43 @@ module Test =
assertEqual rtd.Properties.CorrelationId corr
()
+ let streams uri =
+ use c = connect uri
+ let name = "streams-test"
+ let address = "/amq/queue/stream_q2"
+ let sender = SenderLink(c.Session, name + "-sender" , address)
+ //for body in sampleTypes do
+ let body = "hi"B :> obj
+
+ let corr = "correlation"
+ new Message(body,
+ Properties = new Properties(CorrelationId = corr))
+ |> sender.Send
+ //TODO wait for settlement
+ let specs = [box("first");
+ box("last");
+ box("10m");
+ box(0)]
+ for spec in specs do
+ printfn "testing streams spec %A" spec
+ let filterSet = Map()
+ filterSet.Add(Symbol "rabbitmq:stream-offset-spec", spec)
+ let source = Source(Address = address,
+ FilterSet = filterSet)
+ let attach = Attach(Source = source)
+ let attached = new OnAttached (fun _ _ -> ())
+ let receiver = ReceiverLink(c.Session, Guid.NewGuid().ToString(), attach, attached)
+ receiver.SetCredit(100, true)
+ let rtd = receiver.Receive()
+ assertNotNull rtd
+ assertTrue (rtd.MessageAnnotations.Map.Count = 1)
+ let (result, _) = rtd.MessageAnnotations.Map.TryGetValue("x-stream-offset")
+ assertTrue result
+ assertEqual body rtd.Body
+ assertEqual rtd.Properties.CorrelationId corr
+ receiver.Close()
+ ()
+
open RabbitMQ.Client
let roundtrip_to_amqp_091 uri =
@@ -478,6 +519,9 @@ let main argv =
| [AsLower "invalid_routes"; uri] ->
invalidRoutes uri
0
+ | [AsLower "streams"; uri] ->
+ streams uri
+ 0
| _ ->
printfn "test %A not found. usage: <test> <uri>" argv
1
diff --git a/deps/rabbitmq_codegen/.DS_Store b/deps/rabbitmq_codegen/.DS_Store
new file mode 100644
index 0000000000..8fa6166683
--- /dev/null
+++ b/deps/rabbitmq_codegen/.DS_Store
Binary files differ