diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-10-01 18:31:57 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-10-01 18:31:57 +0100 |
commit | 0aa36d41b30e68582de75436d029ccf9b8a62509 (patch) | |
tree | 7d17d2a47e5d2247d15cd15beebb8a62565ea44f | |
parent | 77519b59e3c7e560afa239b78a1ecb2e2350b060 (diff) | |
parent | ef4b180669dc1b36717982a74682b8edee5873f7 (diff) | |
download | rabbitmq-server-0aa36d41b30e68582de75436d029ccf9b8a62509.tar.gz |
merge default into bug20284
-rw-r--r-- | src/rabbit_channel.erl | 76 |
1 files changed, 36 insertions, 40 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 722eff37..a755b073 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -360,14 +360,10 @@ terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> return_queue_declare_ok(#resource{name = ActualName}, NoWait, MessageCount, ConsumerCount, State) -> - NewState = State#ch{most_recently_declared_queue = ActualName}, - case NoWait of - true -> {noreply, NewState}; - false -> Reply = #'queue.declare_ok'{queue = ActualName, - message_count = MessageCount, - consumer_count = ConsumerCount}, - {reply, Reply, NewState} - end. + return_ok(State#ch{most_recently_declared_queue = ActualName}, NoWait, + #'queue.declare_ok'{queue = ActualName, + message_count = MessageCount, + consumer_count = ConsumerCount}). check_resource_access(Username, Resource, Perm) -> V = {Resource, Perm}, @@ -389,30 +385,30 @@ clear_permission_cache() -> erase(permission_cache), ok. -check_configure_permitted(Resource, #ch{ username = Username}) -> +check_configure_permitted(Resource, #ch{username = Username}) -> check_resource_access(Username, Resource, configure). -check_write_permitted(Resource, #ch{ username = Username}) -> +check_write_permitted(Resource, #ch{username = Username}) -> check_resource_access(Username, Resource, write). -check_read_permitted(Resource, #ch{ username = Username}) -> +check_read_permitted(Resource, #ch{username = Username}) -> check_resource_access(Username, Resource, read). -expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> +expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> rabbit_misc:protocol_error( not_found, "no previously declared queue", []); -expand_queue_name_shortcut(<<>>, #ch{ virtual_host = VHostPath, - most_recently_declared_queue = MRDQ }) -> +expand_queue_name_shortcut(<<>>, #ch{virtual_host = VHostPath, + most_recently_declared_queue = MRDQ}) -> rabbit_misc:r(VHostPath, queue, MRDQ); -expand_queue_name_shortcut(QueueNameBin, #ch{ virtual_host = VHostPath }) -> +expand_queue_name_shortcut(QueueNameBin, #ch{virtual_host = VHostPath}) -> rabbit_misc:r(VHostPath, queue, QueueNameBin). expand_routing_key_shortcut(<<>>, <<>>, - #ch{ most_recently_declared_queue = <<>> }) -> + #ch{most_recently_declared_queue = <<>>}) -> rabbit_misc:protocol_error( not_found, "no previously declared queue", []); expand_routing_key_shortcut(<<>>, <<>>, - #ch{ most_recently_declared_queue = MRDQ }) -> + #ch{most_recently_declared_queue = MRDQ}) -> MRDQ; expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) -> RoutingKey. @@ -530,11 +526,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, unconfirmed = gb_sets:add(Count, State#ch.unconfirmed)}} end, - Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = DecodedContent, - guid = rabbit_guid:guid(), - is_persistent = IsPersistent}, + Message = #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey, + content = DecodedContent, + guid = rabbit_guid:guid(), + is_persistent = IsPersistent}, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -574,9 +570,9 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{ writer_pid = WriterPid, - reader_pid = ReaderPid, - next_tag = DeliveryTag }) -> + _, State = #ch{writer_pid = WriterPid, + reader_pid = ReaderPid, + next_tag = DeliveryTag}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( @@ -606,15 +602,15 @@ handle_method(#'basic.get'{queue = QueueNameBin, {reply, #'basic.get_empty'{}, State} end; -handle_method(#'basic.consume'{queue = QueueNameBin, +handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ConsumerTag, - no_local = _, % FIXME: implement - no_ack = NoAck, - exclusive = ExclusiveConsume, - nowait = NoWait}, - _, State = #ch{ reader_pid = ReaderPid, - limiter_pid = LimiterPid, - consumer_mapping = ConsumerMapping }) -> + no_local = _, % FIXME: implement + no_ack = NoAck, + exclusive = ExclusiveConsume, + nowait = NoWait}, + _, State = #ch{reader_pid = ReaderPid, + limiter_pid = LimiterPid, + consumer_mapping = ConsumerMapping }) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), @@ -709,7 +705,7 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}}; handle_method(#'basic.recover_async'{requeue = true}, - _, State = #ch{ unacked_message_q = UAMQ }) -> + _, State = #ch{unacked_message_q = UAMQ}) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> %% The Qpid python test suite incorrectly assumes @@ -724,8 +720,8 @@ handle_method(#'basic.recover_async'{requeue = true}, {noreply, State#ch{unacked_message_q = queue:new()}}; handle_method(#'basic.recover_async'{requeue = false}, - _, State = #ch{ writer_pid = WriterPid, - unacked_message_q = UAMQ }) -> + _, State = #ch{writer_pid = WriterPid, + unacked_message_q = UAMQ}) -> ok = rabbit_misc:queue_fold( fun ({_DeliveryTag, none, _Msg}, ok) -> %% Was sent as a basic.get_ok. Don't redeliver @@ -758,7 +754,7 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, - _, State = #ch{ unacked_message_q = UAMQ}) -> + _, State = #ch{unacked_message_q = UAMQ}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false), ok = fold_per_queue( fun (QPid, MsgIds, ok) -> @@ -775,7 +771,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, internal = false, nowait = NoWait, arguments = Args}, - _, State = #ch{ virtual_host = VHostPath }) -> + _, State = #ch{virtual_host = VHostPath}) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_configure_permitted(ExchangeName, State), @@ -803,7 +799,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, handle_method(#'exchange.declare'{exchange = ExchangeNameBin, passive = true, nowait = NoWait}, - _, State = #ch{ virtual_host = VHostPath }) -> + _, State = #ch{virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_configure_permitted(ExchangeName, State), _ = rabbit_exchange:lookup_or_die(ExchangeName), @@ -812,7 +808,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, handle_method(#'exchange.delete'{exchange = ExchangeNameBin, if_unused = IfUnused, nowait = NoWait}, - _, State = #ch { virtual_host = VHostPath }) -> + _, State = #ch{virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_configure_permitted(ExchangeName, State), case rabbit_exchange:delete(ExchangeName, IfUnused) of |