diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-10-01 18:31:32 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-10-01 18:31:32 +0100 |
commit | ef4b180669dc1b36717982a74682b8edee5873f7 (patch) | |
tree | 10550395b554ea6cc57bdcee748c6a3dc2105538 | |
parent | 32e2295f84361aec3550bed9c7ba01eb863fd6d6 (diff) | |
download | rabbitmq-server-ef4b180669dc1b36717982a74682b8edee5873f7.tar.gz |
cosmetic and minor refactoring
-rw-r--r-- | src/rabbit_channel.erl | 77 |
1 files changed, 37 insertions, 40 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c9b33b60..fe36cef9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -315,14 +315,10 @@ terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> return_queue_declare_ok(#resource{name = ActualName}, NoWait, MessageCount, ConsumerCount, State) -> - NewState = State#ch{most_recently_declared_queue = ActualName}, - case NoWait of - true -> {noreply, NewState}; - false -> Reply = #'queue.declare_ok'{queue = ActualName, - message_count = MessageCount, - consumer_count = ConsumerCount}, - {reply, Reply, NewState} - end. + return_ok(State#ch{most_recently_declared_queue = ActualName}, NoWait, + #'queue.declare_ok'{queue = ActualName, + message_count = MessageCount, + consumer_count = ConsumerCount}). check_resource_access(Username, Resource, Perm) -> V = {Resource, Perm}, @@ -344,30 +340,30 @@ clear_permission_cache() -> erase(permission_cache), ok. -check_configure_permitted(Resource, #ch{ username = Username}) -> +check_configure_permitted(Resource, #ch{username = Username}) -> check_resource_access(Username, Resource, configure). -check_write_permitted(Resource, #ch{ username = Username}) -> +check_write_permitted(Resource, #ch{username = Username}) -> check_resource_access(Username, Resource, write). -check_read_permitted(Resource, #ch{ username = Username}) -> +check_read_permitted(Resource, #ch{username = Username}) -> check_resource_access(Username, Resource, read). -expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> +expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> rabbit_misc:protocol_error( not_found, "no previously declared queue", []); -expand_queue_name_shortcut(<<>>, #ch{ virtual_host = VHostPath, - most_recently_declared_queue = MRDQ }) -> +expand_queue_name_shortcut(<<>>, #ch{virtual_host = VHostPath, + most_recently_declared_queue = MRDQ}) -> rabbit_misc:r(VHostPath, queue, MRDQ); -expand_queue_name_shortcut(QueueNameBin, #ch{ virtual_host = VHostPath }) -> +expand_queue_name_shortcut(QueueNameBin, #ch{virtual_host = VHostPath}) -> rabbit_misc:r(VHostPath, queue, QueueNameBin). expand_routing_key_shortcut(<<>>, <<>>, - #ch{ most_recently_declared_queue = <<>> }) -> + #ch{most_recently_declared_queue = <<>>}) -> rabbit_misc:protocol_error( not_found, "no previously declared queue", []); expand_routing_key_shortcut(<<>>, <<>>, - #ch{ most_recently_declared_queue = MRDQ }) -> + #ch{most_recently_declared_queue = MRDQ}) -> MRDQ; expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) -> RoutingKey. @@ -438,11 +434,11 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), IsPersistent = is_message_persistent(DecodedContent), - Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = DecodedContent, - guid = rabbit_guid:guid(), - is_persistent = IsPersistent}, + Message = #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey, + content = DecodedContent, + guid = rabbit_guid:guid(), + is_persistent = IsPersistent}, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -481,9 +477,9 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{ writer_pid = WriterPid, - reader_pid = ReaderPid, - next_tag = DeliveryTag }) -> + _, State = #ch{writer_pid = WriterPid, + reader_pid = ReaderPid, + next_tag = DeliveryTag}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( @@ -513,15 +509,15 @@ handle_method(#'basic.get'{queue = QueueNameBin, {reply, #'basic.get_empty'{}, State} end; -handle_method(#'basic.consume'{queue = QueueNameBin, +handle_method(#'basic.consume'{queue = QueueNameBin, consumer_tag = ConsumerTag, - no_local = _, % FIXME: implement - no_ack = NoAck, - exclusive = ExclusiveConsume, - nowait = NoWait}, - _, State = #ch{ reader_pid = ReaderPid, - limiter_pid = LimiterPid, - consumer_mapping = ConsumerMapping }) -> + no_local = _, % FIXME: implement + no_ack = NoAck, + exclusive = ExclusiveConsume, + nowait = NoWait}, + _, State = #ch{reader_pid = ReaderPid, + limiter_pid = LimiterPid, + consumer_mapping = ConsumerMapping }) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), @@ -616,7 +612,7 @@ handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}}; handle_method(#'basic.recover_async'{requeue = true}, - _, State = #ch{ unacked_message_q = UAMQ }) -> + _, State = #ch{unacked_message_q = UAMQ}) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> %% The Qpid python test suite incorrectly assumes @@ -631,8 +627,8 @@ handle_method(#'basic.recover_async'{requeue = true}, {noreply, State#ch{unacked_message_q = queue:new()}}; handle_method(#'basic.recover_async'{requeue = false}, - _, State = #ch{ writer_pid = WriterPid, - unacked_message_q = UAMQ }) -> + _, State = #ch{writer_pid = WriterPid, + unacked_message_q = UAMQ}) -> ok = rabbit_misc:queue_fold( fun ({_DeliveryTag, none, _Msg}, ok) -> %% Was sent as a basic.get_ok. Don't redeliver @@ -665,7 +661,7 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue}, - _, State = #ch{ unacked_message_q = UAMQ}) -> + _, State = #ch{unacked_message_q = UAMQ}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false), ok = fold_per_queue( fun (QPid, MsgIds, ok) -> @@ -682,7 +678,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, internal = false, nowait = NoWait, arguments = Args}, - _, State = #ch{ virtual_host = VHostPath }) -> + _, State = #ch{virtual_host = VHostPath}) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_configure_permitted(ExchangeName, State), @@ -710,7 +706,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, handle_method(#'exchange.declare'{exchange = ExchangeNameBin, passive = true, nowait = NoWait}, - _, State = #ch{ virtual_host = VHostPath }) -> + _, State = #ch{virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_configure_permitted(ExchangeName, State), _ = rabbit_exchange:lookup_or_die(ExchangeName), @@ -719,7 +715,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, handle_method(#'exchange.delete'{exchange = ExchangeNameBin, if_unused = IfUnused, nowait = NoWait}, - _, State = #ch { virtual_host = VHostPath }) -> + _, State = #ch{virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_configure_permitted(ExchangeName, State), case rabbit_exchange:delete(ExchangeName, IfUnused) of @@ -873,6 +869,7 @@ handle_method(#'channel.flow'{active = true}, _, end, {reply, #'channel.flow_ok'{active = true}, State#ch{limiter_pid = LimiterPid1}}; + handle_method(#'channel.flow'{active = false}, _, State = #ch{limiter_pid = LimiterPid, consumer_mapping = Consumers}) -> |