summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-09-27 09:52:48 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-09-27 09:52:48 +0200
commitb758edc64247ef8b33bf8c897b37013d9eef99ec (patch)
tree7363f632f390b14377e756abff52ff0acfdf252c
parentc78b04307bf87eb604ebe6b19abefaa9d22984dc (diff)
downloadrabbitmq-server-git-b758edc64247ef8b33bf8c897b37013d9eef99ec.tar.gz
Handle no-context delivery in web stomprabbitmq-server-3508-web-stomp-stream-consuming
To support messages from streams, which do not have a context (for credit flow). References rabbitmq/rabbitmq-stomp#138 Fixes #3508
-rw-r--r--deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl2
-rw-r--r--deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl9
2 files changed, 10 insertions, 1 deletions
diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl
index 30a0b94b35..4cbdaf10ba 100644
--- a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl
+++ b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl
@@ -827,7 +827,7 @@ send_delivery(Delivery = #'basic.deliver'{consumer_tag = ConsumerTag},
NewState.
notify_received(undefined) ->
- %% no notification for quorum queues
+ %% no notification for quorum queues and streams
ok;
notify_received(DeliveryCtx) ->
%% notification for flow control
diff --git a/deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl b/deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl
index 3a1bcd42cb..ebec3da671 100644
--- a/deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl
+++ b/deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl
@@ -183,6 +183,15 @@ websocket_info({Delivery = #'basic.deliver'{},
DeliveryCtx,
ProcState0),
{ok, State#state{ proc_state = ProcState }};
+websocket_info({Delivery = #'basic.deliver'{},
+ #amqp_msg{props = Props, payload = Payload}},
+ State=#state{ proc_state = ProcState0 }) ->
+ ProcState = rabbit_stomp_processor:send_delivery(Delivery,
+ Props,
+ Payload,
+ undefined,
+ ProcState0),
+ {ok, State#state{ proc_state = ProcState }};
websocket_info(#'basic.cancel'{consumer_tag = Ctag},
State=#state{ proc_state = ProcState0 }) ->
case rabbit_stomp_processor:cancel_consumer(Ctag, ProcState0) of