summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-18 14:39:05 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-18 14:39:05 +0100
commit262692bcbcf35a918c40cc764e22f70fc4508027 (patch)
tree40fb242443d218a799b0d5f8cf1864d8ae155bef
parent68b2e6e0f2b9fae37ad4a72fcb7bcc9a4dd4f188 (diff)
downloadrabbitmq-server-262692bcbcf35a918c40cc764e22f70fc4508027.tar.gz
Implement declaration.
-rw-r--r--src/rabbit_channel.erl42
1 files changed, 40 insertions, 2 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b61a53f8..472091d0 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -147,8 +147,7 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
deliver_reply(<<"amq.rabbitmq.reply-to.", Rest/binary>>, Delivery) ->
- [PidEnc, Key] = binary:split(Rest, <<".">>),
- Pid = binary_to_term(base64:decode(PidEnc)),
+ {ok, Pid, Key} = decode_fast_reply_to(Rest),
delegate:invoke_no_result(
Pid, {?MODULE, deliver_reply_local, [Key, Delivery]}).
@@ -160,6 +159,27 @@ deliver_reply_local(Pid, Key, Delivery) ->
false -> ok
end.
+declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) ->
+ exists;
+declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", Rest/binary>>) ->
+ case decode_fast_reply_to(Rest) of
+ {ok, Pid, Key} ->
+ rabbit_misc:with_exit_handler(
+ rabbit_misc:const(not_found),
+ fun() -> gen_server2:call(Pid, {declare_fast_reply_to, Key}) end);
+ error ->
+ not_found
+ end;
+declare_fast_reply_to(_) ->
+ not_found.
+
+decode_fast_reply_to(Suffix) ->
+ case binary:split(Suffix, <<".">>) of
+ [PidEnc, Key] -> Pid = binary_to_term(base64:decode(PidEnc)),
+ {ok, Pid, Key};
+ _ -> error
+ end.
+
send_credit_reply(Pid, Len) ->
gen_server2:cast(Pid, {send_credit_reply, Len}).
@@ -281,6 +301,13 @@ handle_call({info, Items}, _From, State) ->
handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
reply(ok, State#ch{trace_state = rabbit_trace:init(VHost)});
+handle_call({declare_fast_reply_to, Key}, _From,
+ State = #ch{reply_consumer = Consumer}) ->
+ reply(case Consumer of
+ {_, _, Key} -> exists;
+ _ -> not_found
+ end, State);
+
handle_call(_Request, _From, State) ->
noreply(State).
@@ -1082,6 +1109,17 @@ handle_method(#'exchange.unbind'{destination = DestinationNameBin,
SourceNameBin, exchange, DestinationNameBin, RoutingKey,
Arguments, #'exchange.unbind_ok'{}, NoWait, State);
+%% Note that all declares to these are effectively passive
+handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to",
+ _/binary>> = QueueNameBin,
+ nowait = NoWait}, _,
+ State = #ch{virtual_host = VHost}) ->
+ QueueName = rabbit_misc:r(VHost, queue, QueueNameBin),
+ case declare_fast_reply_to(QueueNameBin) of
+ exists -> return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
+ not_found -> rabbit_misc:not_found(QueueName)
+ end;
+
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = false,
durable = DurableDeclare,