diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-08-18 14:39:05 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-08-18 14:39:05 +0100 |
commit | 262692bcbcf35a918c40cc764e22f70fc4508027 (patch) | |
tree | 40fb242443d218a799b0d5f8cf1864d8ae155bef | |
parent | 68b2e6e0f2b9fae37ad4a72fcb7bcc9a4dd4f188 (diff) | |
download | rabbitmq-server-262692bcbcf35a918c40cc764e22f70fc4508027.tar.gz |
Implement declaration.
-rw-r--r-- | src/rabbit_channel.erl | 42 |
1 files changed, 40 insertions, 2 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b61a53f8..472091d0 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -147,8 +147,7 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). deliver_reply(<<"amq.rabbitmq.reply-to.", Rest/binary>>, Delivery) -> - [PidEnc, Key] = binary:split(Rest, <<".">>), - Pid = binary_to_term(base64:decode(PidEnc)), + {ok, Pid, Key} = decode_fast_reply_to(Rest), delegate:invoke_no_result( Pid, {?MODULE, deliver_reply_local, [Key, Delivery]}). @@ -160,6 +159,27 @@ deliver_reply_local(Pid, Key, Delivery) -> false -> ok end. +declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) -> + exists; +declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", Rest/binary>>) -> + case decode_fast_reply_to(Rest) of + {ok, Pid, Key} -> + rabbit_misc:with_exit_handler( + rabbit_misc:const(not_found), + fun() -> gen_server2:call(Pid, {declare_fast_reply_to, Key}) end); + error -> + not_found + end; +declare_fast_reply_to(_) -> + not_found. + +decode_fast_reply_to(Suffix) -> + case binary:split(Suffix, <<".">>) of + [PidEnc, Key] -> Pid = binary_to_term(base64:decode(PidEnc)), + {ok, Pid, Key}; + _ -> error + end. + send_credit_reply(Pid, Len) -> gen_server2:cast(Pid, {send_credit_reply, Len}). @@ -281,6 +301,13 @@ handle_call({info, Items}, _From, State) -> handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) -> reply(ok, State#ch{trace_state = rabbit_trace:init(VHost)}); +handle_call({declare_fast_reply_to, Key}, _From, + State = #ch{reply_consumer = Consumer}) -> + reply(case Consumer of + {_, _, Key} -> exists; + _ -> not_found + end, State); + handle_call(_Request, _From, State) -> noreply(State). @@ -1082,6 +1109,17 @@ handle_method(#'exchange.unbind'{destination = DestinationNameBin, SourceNameBin, exchange, DestinationNameBin, RoutingKey, Arguments, #'exchange.unbind_ok'{}, NoWait, State); +%% Note that all declares to these are effectively passive +handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to", + _/binary>> = QueueNameBin, + nowait = NoWait}, _, + State = #ch{virtual_host = VHost}) -> + QueueName = rabbit_misc:r(VHost, queue, QueueNameBin), + case declare_fast_reply_to(QueueNameBin) of + exists -> return_queue_declare_ok(QueueName, NoWait, 0, 0, State); + not_found -> rabbit_misc:not_found(QueueName) + end; + handle_method(#'queue.declare'{queue = QueueNameBin, passive = false, durable = DurableDeclare, |