diff options
-rw-r--r-- | src/rabbit_channel.erl | 87 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 10 |
2 files changed, 92 insertions, 5 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d2f6719c75..0301239a30 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -21,7 +21,8 @@ -behaviour(gen_server2). -export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2]). +-export([send_command/2, deliver/4, deliver_fast_reply/2, + send_credit_reply/2, send_drained/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). -export([force_event_refresh/1]). @@ -39,7 +40,7 @@ queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed, confirmed, mandatory, capabilities, trace_state, - consumer_prefetch}). + consumer_prefetch, reply_consumer}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -142,6 +143,10 @@ send_command(Pid, Msg) -> deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). +deliver_fast_reply(<<"amq.consumer.", Enc/binary>>, Delivery) -> + Pid = binary_to_term(base64:decode(Enc)), + gen_server2:cast(Pid, {deliver_fast_reply, Delivery}). + send_credit_reply(Pid, Len) -> gen_server2:cast(Pid, {send_credit_reply, Len}). @@ -219,7 +224,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, mandatory = dtree:empty(), capabilities = Capabilities, trace_state = rabbit_trace:init(VHost), - consumer_prefetch = 0}, + consumer_prefetch = 0, + reply_consumer = none}, State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer), rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #ch.stats_timer, @@ -326,6 +332,28 @@ handle_cast({deliver, ConsumerTag, AckRequired, Content), noreply(record_sent(ConsumerTag, AckRequired, Msg, State)); +handle_cast({deliver_fast_reply, _Del}, State = #ch{state = closing}) -> + noreply(State); +handle_cast({deliver_fast_reply, _Del}, State = #ch{reply_consumer = none}) -> + noreply(State); +%% TODO mandatory and confirm need handling here +handle_cast({deliver_fast_reply, #delivery{message = + #basic_message{exchange_name = ExchangeName, + routing_keys = [RoutingKey | _CcRoutes], + content = Content}}}, + State = #ch{writer_pid = WriterPid, + next_tag = DeliveryTag, + reply_consumer = ConsumerTag}) -> + ok = rabbit_writer:send_command( + WriterPid, + #'basic.deliver'{consumer_tag = ConsumerTag, + delivery_tag = DeliveryTag, + redelivered = false, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey}, + Content), + noreply(State); + handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command( WriterPid, #'basic.credit_ok'{available = Len}), @@ -609,6 +637,21 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) -> check_name(_Kind, NameBin) -> NameBin. +%% TODO this constitutes a security hole! +maybe_set_fast_reply_to( + C = #content{properties = P = #'P_basic'{reply_to = <<"amq.reply-consumer">>}}, + #ch{reply_consumer = ReplyConsumer}) -> + case ReplyConsumer of + none -> rabbit_misc:protocol_error( + not_allowed, "fast reply consumer does not exist", []); + _ -> Self = base64:encode(term_to_binary(self())), + ReplyTo = <<"amq.consumer.", Self/binary>>, + rabbit_binary_generator:clear_encoded_content( + C#content{properties = P#'P_basic'{reply_to = ReplyTo}}) + end; +maybe_set_fast_reply_to(C, _State) -> + C. + record_confirms([], State) -> State; record_confirms(MXs, State = #ch{confirmed = C}) -> @@ -686,7 +729,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = #content {properties = Props} = - rabbit_binary_parser:ensure_content_decoded(Content), + maybe_set_fast_reply_to( + rabbit_binary_parser:ensure_content_decoded(Content), State), check_user_id_header(Props, State), check_expiration_header(Props), DoConfirm = Tx =/= none orelse ConfirmEnabled, @@ -761,6 +805,41 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, {reply, #'basic.get_empty'{}, State} end; +%% TODO we should support cancel! +handle_method(#'basic.consume'{queue = <<"amq.reply-to">>, + consumer_tag = CTag0, + no_ack = NoAck, + nowait = NoWait}, + _, State = #ch{reply_consumer = ReplyConsumer, + consumer_mapping = ConsumerMapping}) -> + case dict:find(CTag0, ConsumerMapping) of + error -> + case {ReplyConsumer, NoAck} of + {none, true} -> + CTag = case CTag0 of + <<>> -> rabbit_guid:binary( + rabbit_guid:gen_secure(), "amq.ctag"); + Other -> Other + end, + State1 = State#ch{reply_consumer = CTag}, + case NoWait of + true -> {noreply, State1}; + false -> Rep = #'basic.consume_ok'{consumer_tag = CTag}, + {reply, Rep, State1} + end; + {_, false} -> + rabbit_misc:protocol_error( + not_allowed, "reply consumer cannot acknowledge", []); + _ -> + rabbit_misc:protocol_error( + not_allowed, "reply consumer already set", []) + end; + {ok, _} -> + %% Attempted reuse of consumer tag. + rabbit_misc:protocol_error( + not_allowed, "attempt to reuse consumer tag '~s'", [CTag0]) + end; + handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ConsumerTag, no_local = _, % FIXME: implement diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a1772f0afd..94759cffef 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -347,11 +347,19 @@ route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName, case {RName, rabbit_exchange_decorator:select(route, Decorators)} of {<<"">>, []} -> %% Optimisation - [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)]; + %% TODO what if there are decorators? Is that even a sane case? + RKsSorted = lists:usort(RKs), + [rabbit_channel:deliver_fast_reply(RK, Delivery) || + RK <- RKsSorted, fast_reply(RK)], + [rabbit_misc:r(VHost, queue, RK) || RK <- RKsSorted, + not fast_reply(RK)]; {_, SelectedDecorators} -> lists:usort(route1(Delivery, SelectedDecorators, {[X], XName, []})) end. +fast_reply(<<"amq.consumer.", _/binary>>) -> true; +fast_reply(_) -> false. + route1(_, _, {[], _, QNames}) -> QNames; route1(Delivery, Decorators, |