summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-13 13:16:45 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-13 13:16:45 +0100
commita44e99a3273118b222c60e8c388c2de8e6e97407 (patch)
tree369587d7e5e80b74f30d2b11ade4ae8318850c48
parent2ed4c29c8bc278b817b82747d28fc86a46b80f3d (diff)
downloadrabbitmq-server-a44e99a3273118b222c60e8c388c2de8e6e97407.tar.gz
Prototype of fast RPC replies.
-rw-r--r--src/rabbit_channel.erl87
-rw-r--r--src/rabbit_exchange.erl10
2 files changed, 92 insertions, 5 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d2f6719c..0301239a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -21,7 +21,8 @@
-behaviour(gen_server2).
-export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, send_credit_reply/2, send_drained/2]).
+-export([send_command/2, deliver/4, deliver_fast_reply/2,
+ send_credit_reply/2, send_drained/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
-export([force_event_refresh/1]).
@@ -39,7 +40,7 @@
queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
unconfirmed, confirmed, mandatory, capabilities, trace_state,
- consumer_prefetch}).
+ consumer_prefetch, reply_consumer}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -142,6 +143,10 @@ send_command(Pid, Msg) ->
deliver(Pid, ConsumerTag, AckRequired, Msg) ->
gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
+deliver_fast_reply(<<"amq.consumer.", Enc/binary>>, Delivery) ->
+ Pid = binary_to_term(base64:decode(Enc)),
+ gen_server2:cast(Pid, {deliver_fast_reply, Delivery}).
+
send_credit_reply(Pid, Len) ->
gen_server2:cast(Pid, {send_credit_reply, Len}).
@@ -219,7 +224,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
mandatory = dtree:empty(),
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost),
- consumer_prefetch = 0},
+ consumer_prefetch = 0,
+ reply_consumer = none},
State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer),
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #ch.stats_timer,
@@ -326,6 +332,28 @@ handle_cast({deliver, ConsumerTag, AckRequired,
Content),
noreply(record_sent(ConsumerTag, AckRequired, Msg, State));
+handle_cast({deliver_fast_reply, _Del}, State = #ch{state = closing}) ->
+ noreply(State);
+handle_cast({deliver_fast_reply, _Del}, State = #ch{reply_consumer = none}) ->
+ noreply(State);
+%% TODO mandatory and confirm need handling here
+handle_cast({deliver_fast_reply, #delivery{message =
+ #basic_message{exchange_name = ExchangeName,
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content}}},
+ State = #ch{writer_pid = WriterPid,
+ next_tag = DeliveryTag,
+ reply_consumer = ConsumerTag}) ->
+ ok = rabbit_writer:send_command(
+ WriterPid,
+ #'basic.deliver'{consumer_tag = ConsumerTag,
+ delivery_tag = DeliveryTag,
+ redelivered = false,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey},
+ Content),
+ noreply(State);
+
handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(
WriterPid, #'basic.credit_ok'{available = Len}),
@@ -609,6 +637,21 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
check_name(_Kind, NameBin) ->
NameBin.
+%% TODO this constitutes a security hole!
+maybe_set_fast_reply_to(
+ C = #content{properties = P = #'P_basic'{reply_to = <<"amq.reply-consumer">>}},
+ #ch{reply_consumer = ReplyConsumer}) ->
+ case ReplyConsumer of
+ none -> rabbit_misc:protocol_error(
+ not_allowed, "fast reply consumer does not exist", []);
+ _ -> Self = base64:encode(term_to_binary(self())),
+ ReplyTo = <<"amq.consumer.", Self/binary>>,
+ rabbit_binary_generator:clear_encoded_content(
+ C#content{properties = P#'P_basic'{reply_to = ReplyTo}})
+ end;
+maybe_set_fast_reply_to(C, _State) ->
+ C.
+
record_confirms([], State) ->
State;
record_confirms(MXs, State = #ch{confirmed = C}) ->
@@ -686,7 +729,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = #content {properties = Props} =
- rabbit_binary_parser:ensure_content_decoded(Content),
+ maybe_set_fast_reply_to(
+ rabbit_binary_parser:ensure_content_decoded(Content), State),
check_user_id_header(Props, State),
check_expiration_header(Props),
DoConfirm = Tx =/= none orelse ConfirmEnabled,
@@ -761,6 +805,41 @@ handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
{reply, #'basic.get_empty'{}, State}
end;
+%% TODO we should support cancel!
+handle_method(#'basic.consume'{queue = <<"amq.reply-to">>,
+ consumer_tag = CTag0,
+ no_ack = NoAck,
+ nowait = NoWait},
+ _, State = #ch{reply_consumer = ReplyConsumer,
+ consumer_mapping = ConsumerMapping}) ->
+ case dict:find(CTag0, ConsumerMapping) of
+ error ->
+ case {ReplyConsumer, NoAck} of
+ {none, true} ->
+ CTag = case CTag0 of
+ <<>> -> rabbit_guid:binary(
+ rabbit_guid:gen_secure(), "amq.ctag");
+ Other -> Other
+ end,
+ State1 = State#ch{reply_consumer = CTag},
+ case NoWait of
+ true -> {noreply, State1};
+ false -> Rep = #'basic.consume_ok'{consumer_tag = CTag},
+ {reply, Rep, State1}
+ end;
+ {_, false} ->
+ rabbit_misc:protocol_error(
+ not_allowed, "reply consumer cannot acknowledge", []);
+ _ ->
+ rabbit_misc:protocol_error(
+ not_allowed, "reply consumer already set", [])
+ end;
+ {ok, _} ->
+ %% Attempted reuse of consumer tag.
+ rabbit_misc:protocol_error(
+ not_allowed, "attempt to reuse consumer tag '~s'", [CTag0])
+ end;
+
handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ConsumerTag,
no_local = _, % FIXME: implement
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index a1772f0a..94759cff 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -347,11 +347,19 @@ route(#exchange{name = #resource{virtual_host = VHost, name = RName} = XName,
case {RName, rabbit_exchange_decorator:select(route, Decorators)} of
{<<"">>, []} ->
%% Optimisation
- [rabbit_misc:r(VHost, queue, RK) || RK <- lists:usort(RKs)];
+ %% TODO what if there are decorators? Is that even a sane case?
+ RKsSorted = lists:usort(RKs),
+ [rabbit_channel:deliver_fast_reply(RK, Delivery) ||
+ RK <- RKsSorted, fast_reply(RK)],
+ [rabbit_misc:r(VHost, queue, RK) || RK <- RKsSorted,
+ not fast_reply(RK)];
{_, SelectedDecorators} ->
lists:usort(route1(Delivery, SelectedDecorators, {[X], XName, []}))
end.
+fast_reply(<<"amq.consumer.", _/binary>>) -> true;
+fast_reply(_) -> false.
+
route1(_, _, {[], _, QNames}) ->
QNames;
route1(Delivery, Decorators,