diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2021-10-07 14:03:31 +0100 |
---|---|---|
committer | Karl Nilsson <kjnilsson@gmail.com> | 2021-10-11 13:32:38 +0100 |
commit | 92f83a1af091cb0d2e1344efac02cd9286c40920 (patch) | |
tree | 7f56c35db7aed5bedc53989ff1aa0903df3543b0 | |
parent | 49a47586b075c19e5af521a3f1d333ed8fab4654 (diff) | |
download | rabbitmq-server-git-92f83a1af091cb0d2e1344efac02cd9286c40920.tar.gz |
Improve consuming from a stream using AMQP 1.0
Allow an offset spec to be used to attach at an appropriate point in the
stream. This is done by specifying a source filter with the key rabbitmq:stream-offset-spec.
The offset is also included as a message annotation with the key x-stream-offset.
When a link is detached we also issue a basic.cancel to the 0.9.1 channel. If this wasn't done
and you detached then re-attached a link for the same queue you'd get a consumer-tag offset
error from the 0.9.1 channel.
-rw-r--r-- | deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl | 27 | ||||
-rw-r--r-- | deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl | 28 | ||||
-rw-r--r-- | deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl | 17 | ||||
-rw-r--r-- | deps/rabbitmq_amqp1_0/test/system_SUITE.erl | 19 | ||||
-rwxr-xr-x | deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs | 44 | ||||
-rw-r--r-- | deps/rabbitmq_codegen/.DS_Store | bin | 0 -> 6148 bytes |
6 files changed, 127 insertions, 8 deletions
diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl index 42248ce0ab..a3eccc7587 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_message.erl @@ -12,6 +12,7 @@ -define(PROPERTIES_HEADER, <<"x-amqp-1.0-properties">>). -define(APP_PROPERTIES_HEADER, <<"x-amqp-1.0-app-properties">>). -define(MESSAGE_ANNOTATIONS_HEADER, <<"x-amqp-1.0-message-annotations">>). +-define(STREAM_OFFSET_HEADER, <<"x-stream-offset">>). -define(FOOTER, <<"x-amqp-1.0-footer">>). -include_lib("amqp_client/include/amqp_client.hrl"). @@ -201,11 +202,29 @@ annotated_message(RKey, #'basic.deliver'{redelivered = Redelivered}, first_acquirer = not Redelivered, delivery_count = undefined}, HeadersBin = amqp10_framing:encode_bin(Header10), - MsgAnnoBin = + MsgAnnoBin0 = case table_lookup(Headers, ?MESSAGE_ANNOTATIONS_HEADER) of undefined -> <<>>; {_, MABin} -> MABin - end, + end, + MsgAnnoBin = + case table_lookup(Headers, ?STREAM_OFFSET_HEADER) of + undefined -> + MsgAnnoBin0; + {_, StreamOffset} when is_integer(StreamOffset) -> + case amqp10_framing:decode_bin(MsgAnnoBin0) of + [#'v1_0.message_annotations'{content = C0} = MA] -> + Contents = map_add(utf8, ?STREAM_OFFSET_HEADER, + ulong, StreamOffset, C0), + amqp10_framing:encode_bin( + MA#'v1_0.message_annotations'{content = Contents}); + [] -> + Contents = map_add(utf8, ?STREAM_OFFSET_HEADER, + ulong, StreamOffset, []), + amqp10_framing:encode_bin( + #'v1_0.message_annotations'{content = Contents}) + end + end, PropsBin = case table_lookup(Headers, ?PROPERTIES_HEADER) of {_, Props10Bin} -> @@ -259,3 +278,7 @@ wrap(Type, Val) -> table_lookup(undefined, _) -> undefined; table_lookup(Headers, Header) -> rabbit_misc:table_lookup(Headers, Header). +map_add(_T, _Key, _Type, undefined, Acc) -> + Acc; +map_add(KeyType, Key, Type, Value, Acc) -> + [{wrap(KeyType, Key), wrap(Type, Value)} | Acc]. diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl index 3e58a23b76..2c8da853c7 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_outgoing_link.erl @@ -7,7 +7,7 @@ -module(rabbit_amqp1_0_outgoing_link). --export([attach/3, delivery/6, transferred/3, credit_drained/3, flow/3]). +-export([attach/3, detach/3, delivery/6, transferred/3, credit_drained/3, flow/3]). -include_lib("amqp_client/include/amqp_client.hrl"). -include("rabbit_amqp1_0.hrl"). @@ -24,6 +24,11 @@ default_outcome, route_state}). +detach(#'v1_0.detach'{handle = Handle}, BCh,_Link) -> + CTag = handle_to_ctag(Handle), + rabbit_amqp1_0_channel:call(BCh, #'basic.cancel'{consumer_tag = CTag}), + ok. + attach(#'v1_0.attach'{name = Name, handle = Handle, source = Source, @@ -46,6 +51,8 @@ attach(#'v1_0.attach'{name = Name, DCh) of {ok, Source1, OutgoingLink = #outgoing_link{queue = QueueName}} -> CTag = handle_to_ctag(Handle), + Args = source_filters_to_consumer_args(Source1), + case rabbit_amqp1_0_channel:subscribe( BCh, #'basic.consume'{ queue = QueueName, @@ -56,7 +63,8 @@ attach(#'v1_0.attach'{name = Name, no_ack = false, %% TODO exclusive? exclusive = false, - arguments = [{<<"x-credit">>, table, + arguments = Args ++ + [{<<"x-credit">>, table, [{<<"credit">>, long, 0}, {<<"drain">>, bool, false}]}]}, self()) of @@ -140,6 +148,7 @@ default(Thing, _Default) -> Thing. ensure_source(Source = #'v1_0.source'{address = Address, dynamic = Dynamic, durable = Durable, + filter = _Filters, %% TODO expiry_policy = _ExpiryPolicy, %% TODO @@ -238,3 +247,18 @@ transferred(DeliveryTag, Channel, ok end, Link#outgoing_link{delivery_count = serial_add(Count, 1)}. + +source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) -> + Key = {symbol, <<"rabbitmq:stream-offset-spec">>}, + case lists:keyfind(Key, 1, KVList) of + {_, {timestamp, Ts}} -> + [{<<"x-stream-offset">>, timestamp, Ts div 1000}]; %% 0.9.1 uses second based timestamps + {_, {utf8, Spec}} -> + [{<<"x-stream-offset">>, longstr, Spec}]; %% next, last, first and "10m" etc + {_, {_, Offset}} when is_integer(Offset) -> + [{<<"x-stream-offset">>, long, Offset}]; %% integer offset + _ -> + [] + end; +source_filters_to_consumer_args(_Source) -> + []. diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl index d1899ece06..b2a466785b 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_process.erl @@ -85,6 +85,11 @@ handle_info(#'basic.consume_ok'{}, State) -> %% Handled above {noreply, State}; +handle_info(#'basic.cancel_ok'{}, State) -> + %% just ignore this for now, + %% At some point we should send the detach here but then we'd need to track + %% consumer tags -> link handle somewhere + {noreply, State}; handle_info({#'basic.deliver'{ consumer_tag = ConsumerTag, delivery_tag = DeliveryTag } = Deliver, Msg}, State = #state{frame_max = FrameMax, @@ -294,11 +299,19 @@ handle_control(#'v1_0.disposition'{state = Outcome, {Reply, Session1} -> {reply, Reply, state(Session1, State)} end; -handle_control(#'v1_0.detach'{ handle = Handle }, State) -> +handle_control(#'v1_0.detach'{handle = Handle} = Detach, + #state{backing_channel = BCh} = State) -> %% TODO keep the state around depending on the lifetime %% TODO outgoing links? + case get({out, Handle}) of + undefined -> + ok; + Link -> + erase({out, Handle}), + ok = rabbit_amqp1_0_outgoing_link:detach(Detach, BCh, Link) + end, erase({in, Handle}), - {reply, #'v1_0.detach'{ handle = Handle }, State}; + {reply, #'v1_0.detach'{handle = Handle}, State}; handle_control(#'v1_0.end'{}, _State = #state{ writer_pid = Sock }) -> ok = rabbit_amqp1_0_writer:send_command(Sock, #'v1_0.end'{}), diff --git a/deps/rabbitmq_amqp1_0/test/system_SUITE.erl b/deps/rabbitmq_amqp1_0/test/system_SUITE.erl index bf4117fbbc..24f63a267a 100644 --- a/deps/rabbitmq_amqp1_0/test/system_SUITE.erl +++ b/deps/rabbitmq_amqp1_0/test/system_SUITE.erl @@ -37,12 +37,15 @@ groups() -> auth_failure, access_failure, access_failure_not_allowed, - access_failure_send + access_failure_send, + streams ]}, {java, [], [ roundtrip ]}, - {streams, [], [ + {streams, [ + streams + ], [ ]} ]. @@ -135,6 +138,15 @@ roundtrip(Config) -> {java, "RoundTripTest"} ]). +streams(Config) -> + Ch = rabbit_ct_client_helpers:open_channel(Config, 0), + amqp_channel:call(Ch, #'queue.declare'{queue = <<"stream_q">>, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, "stream"}]}), + run(Config, [ + {dotnet, "streams"} + ]). + roundtrip_to_amqp_091(Config) -> run(Config, [ {dotnet, "roundtrip_to_amqp_091"} @@ -204,6 +216,9 @@ routing(Config) -> amqp_channel:call(Ch, #'queue.declare'{queue = <<"stream_q">>, durable = true, arguments = [{<<"x-queue-type">>, longstr, StreamQT}]}), + amqp_channel:call(Ch, #'queue.declare'{queue = <<"stream_q2">>, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, StreamQT}]}), amqp_channel:call(Ch, #'queue.declare'{queue = <<"autodel_q">>, auto_delete = true}), run(Config, [ diff --git a/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs b/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs index f91f51c5fc..6f083c2309 100755 --- a/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs +++ b/deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs @@ -75,6 +75,10 @@ module Test = if a <> b then failwith (sprintf "Expected: %A\r\nGot: %A" a b) + let assertNotNull a = + if a = null then + failwith (sprintf "Null not expected") + let assertTrue b = if not b then failwith (sprintf "Expected True got False!") @@ -142,6 +146,43 @@ module Test = assertEqual rtd.Properties.CorrelationId corr () + let streams uri = + use c = connect uri + let name = "streams-test" + let address = "/amq/queue/stream_q2" + let sender = SenderLink(c.Session, name + "-sender" , address) + //for body in sampleTypes do + let body = "hi"B :> obj + + let corr = "correlation" + new Message(body, + Properties = new Properties(CorrelationId = corr)) + |> sender.Send + //TODO wait for settlement + let specs = [box("first"); + box("last"); + box("10m"); + box(0)] + for spec in specs do + printfn "testing streams spec %A" spec + let filterSet = Map() + filterSet.Add(Symbol "rabbitmq:stream-offset-spec", spec) + let source = Source(Address = address, + FilterSet = filterSet) + let attach = Attach(Source = source) + let attached = new OnAttached (fun _ _ -> ()) + let receiver = ReceiverLink(c.Session, Guid.NewGuid().ToString(), attach, attached) + receiver.SetCredit(100, true) + let rtd = receiver.Receive() + assertNotNull rtd + assertTrue (rtd.MessageAnnotations.Map.Count = 1) + let (result, _) = rtd.MessageAnnotations.Map.TryGetValue("x-stream-offset") + assertTrue result + assertEqual body rtd.Body + assertEqual rtd.Properties.CorrelationId corr + receiver.Close() + () + open RabbitMQ.Client let roundtrip_to_amqp_091 uri = @@ -478,6 +519,9 @@ let main argv = | [AsLower "invalid_routes"; uri] -> invalidRoutes uri 0 + | [AsLower "streams"; uri] -> + streams uri + 0 | _ -> printfn "test %A not found. usage: <test> <uri>" argv 1 diff --git a/deps/rabbitmq_codegen/.DS_Store b/deps/rabbitmq_codegen/.DS_Store Binary files differnew file mode 100644 index 0000000000..8fa6166683 --- /dev/null +++ b/deps/rabbitmq_codegen/.DS_Store |