summaryrefslogtreecommitdiff
path: root/src/rabbit_channel.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_channel.erl')
-rw-r--r--src/rabbit_channel.erl1871
1 files changed, 0 insertions, 1871 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
deleted file mode 100644
index 13cc925c..00000000
--- a/src/rabbit_channel.erl
+++ /dev/null
@@ -1,1871 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
-%%
-
--module(rabbit_channel).
--include("rabbit_framing.hrl").
--include("rabbit.hrl").
-
--behaviour(gen_server2).
-
--export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, deliver_reply/2,
- send_credit_reply/2, send_drained/2]).
--export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
--export([refresh_config_local/0, ready_for_close/1]).
--export([force_event_refresh/1]).
-
--export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
- prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
-%% Internal
--export([list_local/0, deliver_reply_local/3]).
-
--record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- conn_name, limiter, tx, next_tag, unacked_message_q, user,
- virtual_host, most_recently_declared_queue,
- queue_names, queue_monitors, consumer_mapping,
- queue_consumers, delivering_queues,
- queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
- unconfirmed, confirmed, mandatory, capabilities, trace_state,
- consumer_prefetch, reply_consumer}).
-
--define(MAX_PERMISSION_CACHE_SIZE, 12).
-
--define(STATISTICS_KEYS,
- [pid,
- transactional,
- confirm,
- consumer_count,
- messages_unacknowledged,
- messages_unconfirmed,
- messages_uncommitted,
- acks_uncommitted,
- prefetch_count,
- global_prefetch_count,
- state]).
-
--define(CREATION_EVENT_KEYS,
- [pid,
- name,
- connection,
- number,
- user,
- vhost]).
-
--define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
-
--define(INCR_STATS(Incs, Measure, State),
- case rabbit_event:stats_level(State, #ch.stats_timer) of
- fine -> incr_stats(Incs, Measure);
- _ -> ok
- end).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--export_type([channel_number/0]).
-
--type(channel_number() :: non_neg_integer()).
-
--spec(start_link/11 ::
- (channel_number(), pid(), pid(), pid(), string(),
- rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
- rabbit_framing:amqp_table(), pid(), pid()) ->
- rabbit_types:ok_pid_or_error()).
--spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
--spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
- rabbit_types:maybe(rabbit_types:content())) -> 'ok').
--spec(do_flow/3 :: (pid(), rabbit_framing:amqp_method_record(),
- rabbit_types:maybe(rabbit_types:content())) -> 'ok').
--spec(flush/1 :: (pid()) -> 'ok').
--spec(shutdown/1 :: (pid()) -> 'ok').
--spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
--spec(deliver/4 ::
- (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
- -> 'ok').
--spec(deliver_reply/2 :: (binary(), rabbit_types:delivery()) -> 'ok').
--spec(deliver_reply_local/3 ::
- (pid(), binary(), rabbit_types:delivery()) -> 'ok').
--spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok').
--spec(send_drained/2 :: (pid(), [{rabbit_types:ctag(), non_neg_integer()}])
- -> 'ok').
--spec(list/0 :: () -> [pid()]).
--spec(list_local/0 :: () -> [pid()]).
--spec(info_keys/0 :: () -> rabbit_types:info_keys()).
--spec(info/1 :: (pid()) -> rabbit_types:infos()).
--spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
--spec(info_all/0 :: () -> [rabbit_types:infos()]).
--spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
--spec(refresh_config_local/0 :: () -> 'ok').
--spec(ready_for_close/1 :: (pid()) -> 'ok').
--spec(force_event_refresh/1 :: (reference()) -> 'ok').
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,
- VHost, Capabilities, CollectorPid, Limiter) ->
- gen_server2:start_link(
- ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol,
- User, VHost, Capabilities, CollectorPid, Limiter], []).
-
-do(Pid, Method) ->
- do(Pid, Method, none).
-
-do(Pid, Method, Content) ->
- gen_server2:cast(Pid, {method, Method, Content, noflow}).
-
-do_flow(Pid, Method, Content) ->
- credit_flow:send(Pid),
- gen_server2:cast(Pid, {method, Method, Content, flow}).
-
-flush(Pid) ->
- gen_server2:call(Pid, flush, infinity).
-
-shutdown(Pid) ->
- gen_server2:cast(Pid, terminate).
-
-send_command(Pid, Msg) ->
- gen_server2:cast(Pid, {command, Msg}).
-
-deliver(Pid, ConsumerTag, AckRequired, Msg) ->
- gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
-
-deliver_reply(<<"amq.rabbitmq.reply-to.", Rest/binary>>, Delivery) ->
- case decode_fast_reply_to(Rest) of
- {ok, Pid, Key} ->
- delegate:invoke_no_result(
- Pid, {?MODULE, deliver_reply_local, [Key, Delivery]});
- error ->
- ok
- end.
-
-%% We want to ensure people can't use this mechanism to send a message
-%% to an arbitrary process and kill it!
-deliver_reply_local(Pid, Key, Delivery) ->
- case pg_local:in_group(rabbit_channels, Pid) of
- true -> gen_server2:cast(Pid, {deliver_reply, Key, Delivery});
- false -> ok
- end.
-
-declare_fast_reply_to(<<"amq.rabbitmq.reply-to">>) ->
- exists;
-declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", Rest/binary>>) ->
- case decode_fast_reply_to(Rest) of
- {ok, Pid, Key} ->
- Msg = {declare_fast_reply_to, Key},
- rabbit_misc:with_exit_handler(
- rabbit_misc:const(not_found),
- fun() -> gen_server2:call(Pid, Msg, infinity) end);
- error ->
- not_found
- end;
-declare_fast_reply_to(_) ->
- not_found.
-
-decode_fast_reply_to(Rest) ->
- case string:tokens(binary_to_list(Rest), ".") of
- [PidEnc, Key] -> Pid = binary_to_term(base64:decode(PidEnc)),
- {ok, Pid, Key};
- _ -> error
- end.
-
-send_credit_reply(Pid, Len) ->
- gen_server2:cast(Pid, {send_credit_reply, Len}).
-
-send_drained(Pid, CTagCredit) ->
- gen_server2:cast(Pid, {send_drained, CTagCredit}).
-
-list() ->
- rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:cluster_nodes(running),
- rabbit_channel, list_local, []).
-
-list_local() ->
- pg_local:get_members(rabbit_channels).
-
-info_keys() -> ?INFO_KEYS.
-
-info(Pid) ->
- gen_server2:call(Pid, info, infinity).
-
-info(Pid, Items) ->
- case gen_server2:call(Pid, {info, Items}, infinity) of
- {ok, Res} -> Res;
- {error, Error} -> throw(Error)
- end.
-
-info_all() ->
- rabbit_misc:filter_exit_map(fun (C) -> info(C) end, list()).
-
-info_all(Items) ->
- rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
-
-refresh_config_local() ->
- rabbit_misc:upmap(
- fun (C) -> gen_server2:call(C, refresh_config, infinity) end,
- list_local()),
- ok.
-
-ready_for_close(Pid) ->
- gen_server2:cast(Pid, ready_for_close).
-
-force_event_refresh(Ref) ->
- [gen_server2:cast(C, {force_event_refresh, Ref}) || C <- list()],
- ok.
-
-%%---------------------------------------------------------------------------
-
-init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
- Capabilities, CollectorPid, LimiterPid]) ->
- process_flag(trap_exit, true),
- ?store_proc_name({ConnName, Channel}),
- ok = pg_local:join(rabbit_channels, self()),
- State = #ch{state = starting,
- protocol = Protocol,
- channel = Channel,
- reader_pid = ReaderPid,
- writer_pid = WriterPid,
- conn_pid = ConnPid,
- conn_name = ConnName,
- limiter = rabbit_limiter:new(LimiterPid),
- tx = none,
- next_tag = 1,
- unacked_message_q = queue:new(),
- user = User,
- virtual_host = VHost,
- most_recently_declared_queue = <<>>,
- queue_names = dict:new(),
- queue_monitors = pmon:new(),
- consumer_mapping = dict:new(),
- queue_consumers = dict:new(),
- delivering_queues = sets:new(),
- queue_collector_pid = CollectorPid,
- confirm_enabled = false,
- publish_seqno = 1,
- unconfirmed = dtree:empty(),
- confirmed = [],
- mandatory = dtree:empty(),
- capabilities = Capabilities,
- trace_state = rabbit_trace:init(VHost),
- consumer_prefetch = 0,
- reply_consumer = none},
- State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer),
- rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)),
- rabbit_event:if_enabled(State1, #ch.stats_timer,
- fun() -> emit_stats(State1) end),
- {ok, State1, hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-
-prioritise_call(Msg, _From, _Len, _State) ->
- case Msg of
- info -> 9;
- {info, _Items} -> 9;
- _ -> 0
- end.
-
-prioritise_cast(Msg, _Len, _State) ->
- case Msg of
- {confirm, _MsgSeqNos, _QPid} -> 5;
- {mandatory_received, _MsgSeqNo, _QPid} -> 5;
- _ -> 0
- end.
-
-prioritise_info(Msg, _Len, _State) ->
- case Msg of
- emit_stats -> 7;
- _ -> 0
- end.
-
-handle_call(flush, _From, State) ->
- reply(ok, State);
-
-handle_call(info, _From, State) ->
- reply(infos(?INFO_KEYS, State), State);
-
-handle_call({info, Items}, _From, State) ->
- try
- reply({ok, infos(Items, State)}, State)
- catch Error -> reply({error, Error}, State)
- end;
-
-handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
- reply(ok, State#ch{trace_state = rabbit_trace:init(VHost)});
-
-handle_call({declare_fast_reply_to, Key}, _From,
- State = #ch{reply_consumer = Consumer}) ->
- reply(case Consumer of
- {_, _, Key} -> exists;
- _ -> not_found
- end, State);
-
-handle_call(_Request, _From, State) ->
- noreply(State).
-
-handle_cast({method, Method, Content, Flow},
- State = #ch{reader_pid = Reader,
- virtual_host = VHost}) ->
- case Flow of
- flow -> credit_flow:ack(Reader);
- noflow -> ok
- end,
- try handle_method(rabbit_channel_interceptor:intercept_method(
- expand_shortcuts(Method, State), VHost),
- Content, State) of
- {reply, Reply, NewState} ->
- ok = send(Reply, NewState),
- noreply(NewState);
- {noreply, NewState} ->
- noreply(NewState);
- stop ->
- {stop, normal, State}
- catch
- exit:Reason = #amqp_error{} ->
- MethodName = rabbit_misc:method_record_type(Method),
- handle_exception(Reason#amqp_error{method = MethodName}, State);
- _:Reason ->
- {stop, {Reason, erlang:get_stacktrace()}, State}
- end;
-
-handle_cast(ready_for_close, State = #ch{state = closing,
- writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
- {stop, normal, State};
-
-handle_cast(terminate, State = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:flush(WriterPid),
- {stop, normal, State};
-
-handle_cast({command, #'basic.consume_ok'{consumer_tag = CTag} = Msg}, State) ->
- ok = send(Msg, State),
- noreply(consumer_monitor(CTag, State));
-
-handle_cast({command, Msg}, State) ->
- ok = send(Msg, State),
- noreply(State);
-
-handle_cast({deliver, _CTag, _AckReq, _Msg}, State = #ch{state = closing}) ->
- noreply(State);
-handle_cast({deliver, ConsumerTag, AckRequired,
- Msg = {_QName, QPid, _MsgId, Redelivered,
- #basic_message{exchange_name = ExchangeName,
- routing_keys = [RoutingKey | _CcRoutes],
- content = Content}}},
- State = #ch{writer_pid = WriterPid,
- next_tag = DeliveryTag}) ->
- ok = rabbit_writer:send_command_and_notify(
- WriterPid, QPid, self(),
- #'basic.deliver'{consumer_tag = ConsumerTag,
- delivery_tag = DeliveryTag,
- redelivered = Redelivered,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey},
- Content),
- rabbit_basic:maybe_gc_large_msg(Content),
- noreply(record_sent(ConsumerTag, AckRequired, Msg, State));
-
-handle_cast({deliver_reply, _K, _Del}, State = #ch{state = closing}) ->
- noreply(State);
-handle_cast({deliver_reply, _K, _Del}, State = #ch{reply_consumer = none}) ->
- noreply(State);
-handle_cast({deliver_reply, Key, #delivery{message =
- #basic_message{exchange_name = ExchangeName,
- routing_keys = [RoutingKey | _CcRoutes],
- content = Content}}},
- State = #ch{writer_pid = WriterPid,
- next_tag = DeliveryTag,
- reply_consumer = {ConsumerTag, _Suffix, Key}}) ->
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.deliver'{consumer_tag = ConsumerTag,
- delivery_tag = DeliveryTag,
- redelivered = false,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey},
- Content),
- noreply(State);
-handle_cast({deliver_reply, _K1, _}, State=#ch{reply_consumer = {_, _, _K2}}) ->
- noreply(State);
-
-handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(
- WriterPid, #'basic.credit_ok'{available = Len}),
- noreply(State);
-
-handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) ->
- [ok = rabbit_writer:send_command(
- WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag,
- credit_drained = CreditDrained})
- || {ConsumerTag, CreditDrained} <- CTagCredit],
- noreply(State);
-
-handle_cast({force_event_refresh, Ref}, State) ->
- rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State),
- Ref),
- noreply(rabbit_event:init_stats_timer(State, #ch.stats_timer));
-
-handle_cast({mandatory_received, MsgSeqNo}, State = #ch{mandatory = Mand}) ->
- %% NB: don't call noreply/1 since we don't want to send confirms.
- noreply_coalesce(State#ch{mandatory = dtree:drop(MsgSeqNo, Mand)});
-
-handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) ->
- {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
- %% NB: don't call noreply/1 since we don't want to send confirms.
- noreply_coalesce(record_confirms(MXs, State#ch{unconfirmed = UC1})).
-
-handle_info({bump_credit, Msg}, State) ->
- credit_flow:handle_bump_msg(Msg),
- noreply(State);
-
-handle_info(timeout, State) ->
- noreply(State);
-
-handle_info(emit_stats, State) ->
- emit_stats(State),
- State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer),
- %% NB: don't call noreply/1 since we don't want to kick off the
- %% stats timer.
- {noreply, send_confirms(State1), hibernate};
-
-handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
- State1 = handle_publishing_queue_down(QPid, Reason, State),
- State3 = handle_consuming_queue_down(QPid, State1),
- State4 = handle_delivering_queue_down(QPid, State3),
- credit_flow:peer_down(QPid),
- #ch{queue_names = QNames, queue_monitors = QMons} = State4,
- case dict:find(QPid, QNames) of
- {ok, QName} -> erase_queue_stats(QName);
- error -> ok
- end,
- noreply(State4#ch{queue_names = dict:erase(QPid, QNames),
- queue_monitors = pmon:erase(QPid, QMons)});
-
-handle_info({'EXIT', _Pid, Reason}, State) ->
- {stop, Reason, State}.
-
-handle_pre_hibernate(State) ->
- ok = clear_permission_cache(),
- rabbit_event:if_enabled(
- State, #ch.stats_timer,
- fun () -> emit_stats(State, [{idle_since, now()}]) end),
- {hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}.
-
-terminate(Reason, State) ->
- {Res, _State1} = notify_queues(State),
- case Reason of
- normal -> ok = Res;
- shutdown -> ok = Res;
- {shutdown, _Term} -> ok = Res;
- _ -> ok
- end,
- pg_local:leave(rabbit_channels, self()),
- rabbit_event:if_enabled(State, #ch.stats_timer,
- fun() -> emit_stats(State) end),
- rabbit_event:notify(channel_closed, [{pid, self()}]).
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
-
-%%---------------------------------------------------------------------------
-
-reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
-
-noreply(NewState) -> {noreply, next_state(NewState), hibernate}.
-
-next_state(State) -> ensure_stats_timer(send_confirms(State)).
-
-noreply_coalesce(State = #ch{confirmed = C}) ->
- Timeout = case C of [] -> hibernate; _ -> 0 end,
- {noreply, ensure_stats_timer(State), Timeout}.
-
-ensure_stats_timer(State) ->
- rabbit_event:ensure_stats_timer(State, #ch.stats_timer, emit_stats).
-
-return_ok(State, true, _Msg) -> {noreply, State};
-return_ok(State, false, Msg) -> {reply, Msg, State}.
-
-ok_msg(true, _Msg) -> undefined;
-ok_msg(false, Msg) -> Msg.
-
-send(_Command, #ch{state = closing}) ->
- ok;
-send(Command, #ch{writer_pid = WriterPid}) ->
- ok = rabbit_writer:send_command(WriterPid, Command).
-
-handle_exception(Reason, State = #ch{protocol = Protocol,
- channel = Channel,
- writer_pid = WriterPid,
- reader_pid = ReaderPid,
- conn_pid = ConnPid,
- conn_name = ConnName,
- virtual_host = VHost,
- user = User}) ->
- %% something bad's happened: notify_queues may not be 'ok'
- {_Result, State1} = notify_queues(State),
- case rabbit_binary_generator:map_exception(Channel, Reason, Protocol) of
- {Channel, CloseMethod} ->
- rabbit_log:error("Channel error on connection ~p (~s, vhost: '~s',"
- " user: '~s'), channel ~p:~n~p~n",
- [ConnPid, ConnName, VHost, User#user.username,
- Channel, Reason]),
- ok = rabbit_writer:send_command(WriterPid, CloseMethod),
- {noreply, State1};
- {0, _} ->
- ReaderPid ! {channel_exit, Channel, Reason},
- {stop, normal, State1}
- end.
-
--ifdef(use_specs).
--spec(precondition_failed/1 :: (string()) -> no_return()).
--endif.
-precondition_failed(Format) -> precondition_failed(Format, []).
-
--ifdef(use_specs).
--spec(precondition_failed/2 :: (string(), [any()]) -> no_return()).
--endif.
-precondition_failed(Format, Params) ->
- rabbit_misc:protocol_error(precondition_failed, Format, Params).
-
-return_queue_declare_ok(#resource{name = ActualName},
- NoWait, MessageCount, ConsumerCount, State) ->
- return_ok(State#ch{most_recently_declared_queue = ActualName}, NoWait,
- #'queue.declare_ok'{queue = ActualName,
- message_count = MessageCount,
- consumer_count = ConsumerCount}).
-
-check_resource_access(User, Resource, Perm) ->
- V = {Resource, Perm},
- Cache = case get(permission_cache) of
- undefined -> [];
- Other -> Other
- end,
- case lists:member(V, Cache) of
- true -> ok;
- false -> ok = rabbit_access_control:check_resource_access(
- User, Resource, Perm),
- CacheTail = lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE-1),
- put(permission_cache, [V | CacheTail])
- end.
-
-clear_permission_cache() -> erase(permission_cache),
- ok.
-
-check_configure_permitted(Resource, #ch{user = User}) ->
- check_resource_access(User, Resource, configure).
-
-check_write_permitted(Resource, #ch{user = User}) ->
- check_resource_access(User, Resource, write).
-
-check_read_permitted(Resource, #ch{user = User}) ->
- check_resource_access(User, Resource, read).
-
-check_user_id_header(#'P_basic'{user_id = undefined}, _) ->
- ok;
-check_user_id_header(#'P_basic'{user_id = Username},
- #ch{user = #user{username = Username}}) ->
- ok;
-check_user_id_header(
- #'P_basic'{}, #ch{user = #user{authz_backends =
- [{rabbit_auth_backend_dummy, _}]}}) ->
- ok;
-check_user_id_header(#'P_basic'{user_id = Claimed},
- #ch{user = #user{username = Actual,
- tags = Tags}}) ->
- case lists:member(impersonator, Tags) of
- true -> ok;
- false -> precondition_failed(
- "user_id property set to '~s' but authenticated user was "
- "'~s'", [Claimed, Actual])
- end.
-
-check_expiration_header(Props) ->
- case rabbit_basic:parse_expiration(Props) of
- {ok, _} -> ok;
- {error, E} -> precondition_failed("invalid expiration '~s': ~p",
- [Props#'P_basic'.expiration, E])
- end.
-
-check_internal_exchange(#exchange{name = Name, internal = true}) ->
- rabbit_misc:protocol_error(access_refused,
- "cannot publish to internal ~s",
- [rabbit_misc:rs(Name)]);
-check_internal_exchange(_) ->
- ok.
-
-check_msg_size(Content) ->
- Size = rabbit_basic:maybe_gc_large_msg(Content),
- case Size > ?MAX_MSG_SIZE of
- true -> precondition_failed("message size ~B larger than max size ~B",
- [Size, ?MAX_MSG_SIZE]);
- false -> ok
- end.
-
-qbin_to_resource(QueueNameBin, State) ->
- name_to_resource(queue, QueueNameBin, State).
-
-name_to_resource(Type, NameBin, #ch{virtual_host = VHostPath}) ->
- rabbit_misc:r(VHostPath, Type, NameBin).
-
-expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) ->
- rabbit_misc:protocol_error(not_found, "no previously declared queue", []);
-expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = MRDQ}) ->
- MRDQ;
-expand_queue_name_shortcut(QueueNameBin, _) ->
- QueueNameBin.
-
-expand_routing_key_shortcut(<<>>, <<>>,
- #ch{most_recently_declared_queue = <<>>}) ->
- rabbit_misc:protocol_error(not_found, "no previously declared queue", []);
-expand_routing_key_shortcut(<<>>, <<>>,
- #ch{most_recently_declared_queue = MRDQ}) ->
- MRDQ;
-expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) ->
- RoutingKey.
-
-expand_shortcuts(#'basic.get' {queue = Q} = M, State) ->
- M#'basic.get' {queue = expand_queue_name_shortcut(Q, State)};
-expand_shortcuts(#'basic.consume'{queue = Q} = M, State) ->
- M#'basic.consume'{queue = expand_queue_name_shortcut(Q, State)};
-expand_shortcuts(#'queue.delete' {queue = Q} = M, State) ->
- M#'queue.delete' {queue = expand_queue_name_shortcut(Q, State)};
-expand_shortcuts(#'queue.purge' {queue = Q} = M, State) ->
- M#'queue.purge' {queue = expand_queue_name_shortcut(Q, State)};
-expand_shortcuts(#'queue.bind' {queue = Q, routing_key = K} = M, State) ->
- M#'queue.bind' {queue = expand_queue_name_shortcut(Q, State),
- routing_key = expand_routing_key_shortcut(Q, K, State)};
-expand_shortcuts(#'queue.unbind' {queue = Q, routing_key = K} = M, State) ->
- M#'queue.unbind' {queue = expand_queue_name_shortcut(Q, State),
- routing_key = expand_routing_key_shortcut(Q, K, State)};
-expand_shortcuts(M, _State) ->
- M.
-
-check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) ->
- rabbit_misc:protocol_error(
- access_refused, "operation not permitted on the default exchange", []);
-check_not_default_exchange(_) ->
- ok.
-
-check_exchange_deletion(XName = #resource{name = <<"amq.rabbitmq.", _/binary>>,
- kind = exchange}) ->
- rabbit_misc:protocol_error(
- access_refused, "deletion of system ~s not allowed",
- [rabbit_misc:rs(XName)]);
-check_exchange_deletion(_) ->
- ok.
-
-%% check that an exchange/queue name does not contain the reserved
-%% "amq." prefix.
-%%
-%% As per the AMQP 0-9-1 spec, the exclusion of "amq." prefixed names
-%% only applies on actual creation, and not in the cases where the
-%% entity already exists or passive=true.
-%%
-%% NB: We deliberately do not enforce the other constraints on names
-%% required by the spec.
-check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
- rabbit_misc:protocol_error(
- access_refused,
- "~s name '~s' contains reserved prefix 'amq.*'",[Kind, NameBin]);
-check_name(_Kind, NameBin) ->
- NameBin.
-
-maybe_set_fast_reply_to(
- C = #content{properties = P = #'P_basic'{reply_to =
- <<"amq.rabbitmq.reply-to">>}},
- #ch{reply_consumer = ReplyConsumer}) ->
- case ReplyConsumer of
- none -> rabbit_misc:protocol_error(
- precondition_failed,
- "fast reply consumer does not exist", []);
- {_, Suf, _K} -> Rep = <<"amq.rabbitmq.reply-to.", Suf/binary>>,
- rabbit_binary_generator:clear_encoded_content(
- C#content{properties = P#'P_basic'{reply_to = Rep}})
- end;
-maybe_set_fast_reply_to(C, _State) ->
- C.
-
-record_confirms([], State) ->
- State;
-record_confirms(MXs, State = #ch{confirmed = C}) ->
- State#ch{confirmed = [MXs | C]}.
-
-handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
- %% Don't leave "starting" as the state for 5s. TODO is this TRTTD?
- State1 = State#ch{state = running},
- rabbit_event:if_enabled(State1, #ch.stats_timer,
- fun() -> emit_stats(State1) end),
- {reply, #'channel.open_ok'{}, State1};
-
-handle_method(#'channel.open'{}, _, _State) ->
- rabbit_misc:protocol_error(
- command_invalid, "second 'channel.open' seen", []);
-
-handle_method(_Method, _, #ch{state = starting}) ->
- rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []);
-
-handle_method(#'channel.close_ok'{}, _, #ch{state = closing}) ->
- stop;
-
-handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid,
- state = closing}) ->
- ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
- {noreply, State};
-
-handle_method(_Method, _, State = #ch{state = closing}) ->
- {noreply, State};
-
-handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
- {ok, State1} = notify_queues(State),
- %% We issue the channel.close_ok response after a handshake with
- %% the reader, the other half of which is ready_for_close. That
- %% way the reader forgets about the channel before we send the
- %% response (and this channel process terminates). If we didn't do
- %% that, a channel.open for the same channel number, which a
- %% client is entitled to send as soon as it has received the
- %% close_ok, might be received by the reader before it has seen
- %% the termination and hence be sent to the old, now dead/dying
- %% channel process, instead of a new process, and thus lost.
- ReaderPid ! {channel_closing, self()},
- {noreply, State1};
-
-%% Even though the spec prohibits the client from sending commands
-%% while waiting for the reply to a synchronous command, we generally
-%% do allow this...except in the case of a pending tx.commit, where
-%% it could wreak havoc.
-handle_method(_Method, _, #ch{tx = Tx})
- when Tx =:= committing orelse Tx =:= failed ->
- rabbit_misc:protocol_error(
- channel_error, "unexpected command while processing 'tx.commit'", []);
-
-handle_method(#'access.request'{},_, State) ->
- {reply, #'access.request_ok'{ticket = 1}, State};
-
-handle_method(#'basic.publish'{immediate = true}, _Content, _State) ->
- rabbit_misc:protocol_error(not_implemented, "immediate=true", []);
-
-handle_method(#'basic.publish'{exchange = ExchangeNameBin,
- routing_key = RoutingKey,
- mandatory = Mandatory},
- Content, State = #ch{virtual_host = VHostPath,
- tx = Tx,
- channel = ChannelNum,
- confirm_enabled = ConfirmEnabled,
- trace_state = TraceState,
- user = #user{username = Username},
- conn_name = ConnName}) ->
- check_msg_size(Content),
- ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- check_write_permitted(ExchangeName, State),
- Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
- check_internal_exchange(Exchange),
- %% We decode the content's properties here because we're almost
- %% certain to want to look at delivery-mode and priority.
- DecodedContent = #content {properties = Props} =
- maybe_set_fast_reply_to(
- rabbit_binary_parser:ensure_content_decoded(Content), State),
- check_user_id_header(Props, State),
- check_expiration_header(Props),
- DoConfirm = Tx =/= none orelse ConfirmEnabled,
- {MsgSeqNo, State1} =
- case DoConfirm orelse Mandatory of
- false -> {undefined, State};
- true -> SeqNo = State#ch.publish_seqno,
- {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
- end,
- case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
- {ok, Message} ->
- rabbit_trace:tap_in(Message, ConnName, ChannelNum,
- Username, TraceState),
- Delivery = rabbit_basic:delivery(
- Mandatory, DoConfirm, Message, MsgSeqNo),
- QNames = rabbit_exchange:route(Exchange, Delivery),
- DQ = {Delivery, QNames},
- {noreply, case Tx of
- none -> deliver_to_queues(DQ, State1);
- {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
- State1#ch{tx = {Msgs1, Acks}}
- end};
- {error, Reason} ->
- precondition_failed("invalid message: ~p", [Reason])
- end;
-
-handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
- multiple = Multiple,
- requeue = Requeue}, _, State) ->
- reject(DeliveryTag, Requeue, Multiple, State);
-
-handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
- multiple = Multiple},
- _, State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
- {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
- State1 = State#ch{unacked_message_q = Remaining},
- {noreply, case Tx of
- none -> ack(Acked, State1),
- State1;
- {Msgs, Acks} -> Acks1 = ack_cons(ack, Acked, Acks),
- State1#ch{tx = {Msgs, Acks1}}
- end};
-
-handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck},
- _, State = #ch{writer_pid = WriterPid,
- conn_pid = ConnPid,
- limiter = Limiter,
- next_tag = DeliveryTag}) ->
- QueueName = qbin_to_resource(QueueNameBin, State),
- check_read_permitted(QueueName, State),
- case rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ConnPid,
- fun (Q) -> rabbit_amqqueue:basic_get(
- Q, self(), NoAck, rabbit_limiter:pid(Limiter))
- end) of
- {ok, MessageCount,
- Msg = {QName, QPid, _MsgId, Redelivered,
- #basic_message{exchange_name = ExchangeName,
- routing_keys = [RoutingKey | _CcRoutes],
- content = Content}}} ->
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.get_ok'{delivery_tag = DeliveryTag,
- redelivered = Redelivered,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey,
- message_count = MessageCount},
- Content),
- State1 = monitor_delivering_queue(NoAck, QPid, QName, State),
- {noreply, record_sent(none, not(NoAck), Msg, State1)};
- empty ->
- {reply, #'basic.get_empty'{}, State}
- end;
-
-handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
- consumer_tag = CTag0,
- no_ack = NoAck,
- nowait = NoWait},
- _, State = #ch{reply_consumer = ReplyConsumer,
- consumer_mapping = ConsumerMapping}) ->
- case dict:find(CTag0, ConsumerMapping) of
- error ->
- case {ReplyConsumer, NoAck} of
- {none, true} ->
- CTag = case CTag0 of
- <<>> -> rabbit_guid:binary(
- rabbit_guid:gen_secure(), "amq.ctag");
- Other -> Other
- end,
- %% Precalculate both suffix and key; base64 encoding is
- %% expensive
- Key = base64:encode(rabbit_guid:gen_secure()),
- PidEnc = base64:encode(term_to_binary(self())),
- Suffix = <<PidEnc/binary, ".", Key/binary>>,
- Consumer = {CTag, Suffix, binary_to_list(Key)},
- State1 = State#ch{reply_consumer = Consumer},
- case NoWait of
- true -> {noreply, State1};
- false -> Rep = #'basic.consume_ok'{consumer_tag = CTag},
- {reply, Rep, State1}
- end;
- {_, false} ->
- rabbit_misc:protocol_error(
- precondition_failed,
- "reply consumer cannot acknowledge", []);
- _ ->
- rabbit_misc:protocol_error(
- precondition_failed, "reply consumer already set", [])
- end;
- {ok, _} ->
- %% Attempted reuse of consumer tag.
- rabbit_misc:protocol_error(
- not_allowed, "attempt to reuse consumer tag '~s'", [CTag0])
- end;
-
-handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
- _, State = #ch{reply_consumer = {ConsumerTag, _, _}}) ->
- State1 = State#ch{reply_consumer = none},
- case NoWait of
- true -> {noreply, State1};
- false -> Rep = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
- {reply, Rep, State1}
- end;
-
-handle_method(#'basic.consume'{queue = QueueNameBin,
- consumer_tag = ConsumerTag,
- no_local = _, % FIXME: implement
- no_ack = NoAck,
- exclusive = ExclusiveConsume,
- nowait = NoWait,
- arguments = Args},
- _, State = #ch{consumer_prefetch = ConsumerPrefetch,
- consumer_mapping = ConsumerMapping}) ->
- case dict:find(ConsumerTag, ConsumerMapping) of
- error ->
- QueueName = qbin_to_resource(QueueNameBin, State),
- check_read_permitted(QueueName, State),
- ActualConsumerTag =
- case ConsumerTag of
- <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
- "amq.ctag");
- Other -> Other
- end,
- case basic_consume(
- QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
- ExclusiveConsume, Args, NoWait, State) of
- {ok, State1} ->
- {noreply, State1};
- {error, exclusive_consume_unavailable} ->
- rabbit_misc:protocol_error(
- access_refused, "~s in exclusive use",
- [rabbit_misc:rs(QueueName)])
- end;
- {ok, _} ->
- %% Attempted reuse of consumer tag.
- rabbit_misc:protocol_error(
- not_allowed, "attempt to reuse consumer tag '~s'", [ConsumerTag])
- end;
-
-handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
- _, State = #ch{consumer_mapping = ConsumerMapping,
- queue_consumers = QCons}) ->
- OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
- case dict:find(ConsumerTag, ConsumerMapping) of
- error ->
- %% Spec requires we ignore this situation.
- return_ok(State, NoWait, OkMsg);
- {ok, {Q = #amqqueue{pid = QPid}, _CParams}} ->
- ConsumerMapping1 = dict:erase(ConsumerTag, ConsumerMapping),
- QCons1 =
- case dict:find(QPid, QCons) of
- error -> QCons;
- {ok, CTags} -> CTags1 = gb_sets:delete(ConsumerTag, CTags),
- case gb_sets:is_empty(CTags1) of
- true -> dict:erase(QPid, QCons);
- false -> dict:store(QPid, CTags1, QCons)
- end
- end,
- NewState = State#ch{consumer_mapping = ConsumerMapping1,
- queue_consumers = QCons1},
- %% In order to ensure that no more messages are sent to
- %% the consumer after the cancel_ok has been sent, we get
- %% the queue process to send the cancel_ok on our
- %% behalf. If we were sending the cancel_ok ourselves it
- %% might overtake a message sent previously by the queue.
- case rabbit_misc:with_exit_handler(
- fun () -> {error, not_found} end,
- fun () ->
- rabbit_amqqueue:basic_cancel(
- Q, self(), ConsumerTag, ok_msg(NoWait, OkMsg))
- end) of
- ok ->
- {noreply, NewState};
- {error, not_found} ->
- %% Spec requires we ignore this situation.
- return_ok(NewState, NoWait, OkMsg)
- end
- end;
-
-handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
- rabbit_misc:protocol_error(not_implemented,
- "prefetch_size!=0 (~w)", [Size]);
-
-handle_method(#'basic.qos'{global = false,
- prefetch_count = PrefetchCount}, _, State) ->
- {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount}};
-
-handle_method(#'basic.qos'{global = true,
- prefetch_count = 0},
- _, State = #ch{limiter = Limiter}) ->
- Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
- {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
-
-handle_method(#'basic.qos'{global = true,
- prefetch_count = PrefetchCount},
- _, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) ->
- %% TODO queue:len(UAMQ) is not strictly right since that counts
- %% unacked messages from basic.get too. Pretty obscure though.
- Limiter1 = rabbit_limiter:limit_prefetch(Limiter,
- PrefetchCount, queue:len(UAMQ)),
- case ((not rabbit_limiter:is_active(Limiter)) andalso
- rabbit_limiter:is_active(Limiter1)) of
- true -> rabbit_amqqueue:activate_limit_all(
- consumer_queues(State#ch.consumer_mapping), self());
- false -> ok
- end,
- {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
-
-handle_method(#'basic.recover_async'{requeue = true},
- _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) ->
- OkFun = fun () -> ok end,
- UAMQL = queue:to_list(UAMQ),
- foreach_per_queue(
- fun (QPid, MsgIds) ->
- rabbit_misc:with_exit_handler(
- OkFun,
- fun () -> rabbit_amqqueue:requeue(QPid, MsgIds, self()) end)
- end, lists:reverse(UAMQL)),
- ok = notify_limiter(Limiter, UAMQL),
- %% No answer required - basic.recover is the newer, synchronous
- %% variant of this method
- {noreply, State#ch{unacked_message_q = queue:new()}};
-
-handle_method(#'basic.recover_async'{requeue = false}, _, _State) ->
- rabbit_misc:protocol_error(not_implemented, "requeue=false", []);
-
-handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
- {noreply, State1} = handle_method(#'basic.recover_async'{requeue = Requeue},
- Content, State),
- {reply, #'basic.recover_ok'{}, State1};
-
-handle_method(#'basic.reject'{delivery_tag = DeliveryTag, requeue = Requeue},
- _, State) ->
- reject(DeliveryTag, Requeue, false, State);
-
-handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
- type = TypeNameBin,
- passive = false,
- durable = Durable,
- auto_delete = AutoDelete,
- internal = Internal,
- nowait = NoWait,
- arguments = Args},
- _, State = #ch{virtual_host = VHostPath}) ->
- CheckedType = rabbit_exchange:check_type(TypeNameBin),
- ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- check_not_default_exchange(ExchangeName),
- check_configure_permitted(ExchangeName, State),
- X = case rabbit_exchange:lookup(ExchangeName) of
- {ok, FoundX} -> FoundX;
- {error, not_found} ->
- check_name('exchange', ExchangeNameBin),
- AeKey = <<"alternate-exchange">>,
- case rabbit_misc:r_arg(VHostPath, exchange, Args, AeKey) of
- undefined -> ok;
- {error, {invalid_type, Type}} ->
- precondition_failed(
- "invalid type '~s' for arg '~s' in ~s",
- [Type, AeKey, rabbit_misc:rs(ExchangeName)]);
- AName -> check_read_permitted(ExchangeName, State),
- check_write_permitted(AName, State),
- ok
- end,
- rabbit_exchange:declare(ExchangeName,
- CheckedType,
- Durable,
- AutoDelete,
- Internal,
- Args)
- end,
- ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable,
- AutoDelete, Internal, Args),
- return_ok(State, NoWait, #'exchange.declare_ok'{});
-
-handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
- passive = true,
- nowait = NoWait},
- _, State = #ch{virtual_host = VHostPath}) ->
- ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- check_not_default_exchange(ExchangeName),
- _ = rabbit_exchange:lookup_or_die(ExchangeName),
- return_ok(State, NoWait, #'exchange.declare_ok'{});
-
-handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
- if_unused = IfUnused,
- nowait = NoWait},
- _, State = #ch{virtual_host = VHostPath}) ->
- ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- check_not_default_exchange(ExchangeName),
- check_exchange_deletion(ExchangeName),
- check_configure_permitted(ExchangeName, State),
- case rabbit_exchange:delete(ExchangeName, IfUnused) of
- {error, not_found} ->
- return_ok(State, NoWait, #'exchange.delete_ok'{});
- {error, in_use} ->
- precondition_failed("~s in use", [rabbit_misc:rs(ExchangeName)]);
- ok ->
- return_ok(State, NoWait, #'exchange.delete_ok'{})
- end;
-
-handle_method(#'exchange.bind'{destination = DestinationNameBin,
- source = SourceNameBin,
- routing_key = RoutingKey,
- nowait = NoWait,
- arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_binding:add/2,
- SourceNameBin, exchange, DestinationNameBin, RoutingKey,
- Arguments, #'exchange.bind_ok'{}, NoWait, State);
-
-handle_method(#'exchange.unbind'{destination = DestinationNameBin,
- source = SourceNameBin,
- routing_key = RoutingKey,
- nowait = NoWait,
- arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_binding:remove/2,
- SourceNameBin, exchange, DestinationNameBin, RoutingKey,
- Arguments, #'exchange.unbind_ok'{}, NoWait, State);
-
-%% Note that all declares to these are effectively passive. If it
-%% exists it by definition has one consumer.
-handle_method(#'queue.declare'{queue = <<"amq.rabbitmq.reply-to",
- _/binary>> = QueueNameBin,
- nowait = NoWait}, _,
- State = #ch{virtual_host = VHost}) ->
- QueueName = rabbit_misc:r(VHost, queue, QueueNameBin),
- case declare_fast_reply_to(QueueNameBin) of
- exists -> return_queue_declare_ok(QueueName, NoWait, 0, 1, State);
- not_found -> rabbit_misc:not_found(QueueName)
- end;
-
-handle_method(#'queue.declare'{queue = QueueNameBin,
- passive = false,
- durable = DurableDeclare,
- exclusive = ExclusiveDeclare,
- auto_delete = AutoDelete,
- nowait = NoWait,
- arguments = Args} = Declare,
- _, State = #ch{virtual_host = VHostPath,
- conn_pid = ConnPid,
- queue_collector_pid = CollectorPid}) ->
- Owner = case ExclusiveDeclare of
- true -> ConnPid;
- false -> none
- end,
- Durable = DurableDeclare andalso not ExclusiveDeclare,
- ActualNameBin = case QueueNameBin of
- <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
- "amq.gen");
- Other -> check_name('queue', Other)
- end,
- QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- check_configure_permitted(QueueName, State),
- case rabbit_amqqueue:with(
- QueueName,
- fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
- Q, Durable, AutoDelete, Args, Owner),
- maybe_stat(NoWait, Q)
- end) of
- {ok, MessageCount, ConsumerCount} ->
- return_queue_declare_ok(QueueName, NoWait, MessageCount,
- ConsumerCount, State);
- {error, not_found} ->
- DlxKey = <<"x-dead-letter-exchange">>,
- case rabbit_misc:r_arg(VHostPath, exchange, Args, DlxKey) of
- undefined ->
- ok;
- {error, {invalid_type, Type}} ->
- precondition_failed(
- "invalid type '~s' for arg '~s' in ~s",
- [Type, DlxKey, rabbit_misc:rs(QueueName)]);
- DLX ->
- check_read_permitted(QueueName, State),
- check_write_permitted(DLX, State),
- ok
- end,
- case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
- Args, Owner) of
- {new, #amqqueue{pid = QPid}} ->
- %% We need to notify the reader within the channel
- %% process so that we can be sure there are no
- %% outstanding exclusive queues being declared as
- %% the connection shuts down.
- ok = case Owner of
- none -> ok;
- _ -> rabbit_queue_collector:register(
- CollectorPid, QPid)
- end,
- return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
- {existing, _Q} ->
- %% must have been created between the stat and the
- %% declare. Loop around again.
- handle_method(Declare, none, State);
- {absent, Q, Reason} ->
- rabbit_misc:absent(Q, Reason);
- {owner_died, _Q} ->
- %% Presumably our own days are numbered since the
- %% connection has died. Pretend the queue exists though,
- %% just so nothing fails.
- return_queue_declare_ok(QueueName, NoWait, 0, 0, State)
- end;
- {error, {absent, Q, Reason}} ->
- rabbit_misc:absent(Q, Reason)
- end;
-
-handle_method(#'queue.declare'{queue = QueueNameBin,
- passive = true,
- nowait = NoWait},
- _, State = #ch{virtual_host = VHostPath,
- conn_pid = ConnPid}) ->
- QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
- {{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
- rabbit_amqqueue:with_or_die(
- QueueName, fun (Q) -> {maybe_stat(NoWait, Q), Q} end),
- ok = rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
- return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount,
- State);
-
-handle_method(#'queue.delete'{queue = QueueNameBin,
- if_unused = IfUnused,
- if_empty = IfEmpty,
- nowait = NoWait},
- _, State = #ch{conn_pid = ConnPid}) ->
- QueueName = qbin_to_resource(QueueNameBin, State),
- check_configure_permitted(QueueName, State),
- case rabbit_amqqueue:with(
- QueueName,
- fun (Q) ->
- rabbit_amqqueue:check_exclusive_access(Q, ConnPid),
- rabbit_amqqueue:delete(Q, IfUnused, IfEmpty)
- end,
- fun (not_found) -> {ok, 0};
- ({absent, Q, crashed}) -> rabbit_amqqueue:delete_crashed(Q),
- {ok, 0};
- ({absent, Q, Reason}) -> rabbit_misc:absent(Q, Reason)
- end) of
- {error, in_use} ->
- precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]);
- {error, not_empty} ->
- precondition_failed("~s not empty", [rabbit_misc:rs(QueueName)]);
- {ok, PurgedMessageCount} ->
- return_ok(State, NoWait,
- #'queue.delete_ok'{message_count = PurgedMessageCount})
- end;
-
-handle_method(#'queue.bind'{queue = QueueNameBin,
- exchange = ExchangeNameBin,
- routing_key = RoutingKey,
- nowait = NoWait,
- arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_binding:add/2,
- ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments,
- #'queue.bind_ok'{}, NoWait, State);
-
-handle_method(#'queue.unbind'{queue = QueueNameBin,
- exchange = ExchangeNameBin,
- routing_key = RoutingKey,
- arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_binding:remove/2,
- ExchangeNameBin, queue, QueueNameBin, RoutingKey, Arguments,
- #'queue.unbind_ok'{}, false, State);
-
-handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait},
- _, State = #ch{conn_pid = ConnPid}) ->
- QueueName = qbin_to_resource(QueueNameBin, State),
- check_read_permitted(QueueName, State),
- {ok, PurgedMessageCount} = rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ConnPid,
- fun (Q) -> rabbit_amqqueue:purge(Q) end),
- return_ok(State, NoWait,
- #'queue.purge_ok'{message_count = PurgedMessageCount});
-
-handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
- precondition_failed("cannot switch from confirm to tx mode");
-
-handle_method(#'tx.select'{}, _, State = #ch{tx = none}) ->
- {reply, #'tx.select_ok'{}, State#ch{tx = new_tx()}};
-
-handle_method(#'tx.select'{}, _, State) ->
- {reply, #'tx.select_ok'{}, State};
-
-handle_method(#'tx.commit'{}, _, #ch{tx = none}) ->
- precondition_failed("channel is not transactional");
-
-handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks},
- limiter = Limiter}) ->
- State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs),
- Rev = fun (X) -> lists:reverse(lists:sort(X)) end,
- lists:foreach(fun ({ack, A}) -> ack(Rev(A), State1);
- ({Requeue, A}) -> reject(Requeue, Rev(A), Limiter)
- end, lists:reverse(Acks)),
- {noreply, maybe_complete_tx(State1#ch{tx = committing})};
-
-handle_method(#'tx.rollback'{}, _, #ch{tx = none}) ->
- precondition_failed("channel is not transactional");
-
-handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- tx = {_Msgs, Acks}}) ->
- AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])),
- UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))),
- {reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1,
- tx = new_tx()}};
-
-handle_method(#'confirm.select'{}, _, #ch{tx = {_, _}}) ->
- precondition_failed("cannot switch from tx to confirm mode");
-
-handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
- return_ok(State#ch{confirm_enabled = true},
- NoWait, #'confirm.select_ok'{});
-
-handle_method(#'channel.flow'{active = true}, _, State) ->
- {reply, #'channel.flow_ok'{active = true}, State};
-
-handle_method(#'channel.flow'{active = false}, _, _State) ->
- rabbit_misc:protocol_error(not_implemented, "active=false", []);
-
-handle_method(#'basic.credit'{consumer_tag = CTag,
- credit = Credit,
- drain = Drain},
- _, State = #ch{consumer_mapping = Consumers}) ->
- case dict:find(CTag, Consumers) of
- {ok, {Q, _CParams}} -> ok = rabbit_amqqueue:credit(
- Q, self(), CTag, Credit, Drain),
- {noreply, State};
- error -> precondition_failed(
- "unknown consumer tag '~s'", [CTag])
- end;
-
-handle_method(_MethodRecord, _Content, _State) ->
- rabbit_misc:protocol_error(
- command_invalid, "unimplemented method", []).
-
-%%----------------------------------------------------------------------------
-
-%% We get the queue process to send the consume_ok on our behalf. This
-%% is for symmetry with basic.cancel - see the comment in that method
-%% for why.
-basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
- ExclusiveConsume, Args, NoWait,
- State = #ch{conn_pid = ConnPid,
- limiter = Limiter,
- consumer_mapping = ConsumerMapping}) ->
- case rabbit_amqqueue:with_exclusive_access_or_die(
- QueueName, ConnPid,
- fun (Q) ->
- {rabbit_amqqueue:basic_consume(
- Q, NoAck, self(),
- rabbit_limiter:pid(Limiter),
- rabbit_limiter:is_active(Limiter),
- ConsumerPrefetch, ActualConsumerTag,
- ExclusiveConsume, Args,
- ok_msg(NoWait, #'basic.consume_ok'{
- consumer_tag = ActualConsumerTag})),
- Q}
- end) of
- {ok, Q = #amqqueue{pid = QPid, name = QName}} ->
- CM1 = dict:store(
- ActualConsumerTag,
- {Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}},
- ConsumerMapping),
- State1 = monitor_delivering_queue(
- NoAck, QPid, QName,
- State#ch{consumer_mapping = CM1}),
- {ok, case NoWait of
- true -> consumer_monitor(ActualConsumerTag, State1);
- false -> State1
- end};
- {{error, exclusive_consume_unavailable} = E, _Q} ->
- E
- end.
-
-maybe_stat(false, Q) -> rabbit_amqqueue:stat(Q);
-maybe_stat(true, _Q) -> {ok, 0, 0}.
-
-consumer_monitor(ConsumerTag,
- State = #ch{consumer_mapping = ConsumerMapping,
- queue_monitors = QMons,
- queue_consumers = QCons}) ->
- {#amqqueue{pid = QPid}, _CParams} =
- dict:fetch(ConsumerTag, ConsumerMapping),
- QCons1 = dict:update(QPid, fun (CTags) ->
- gb_sets:insert(ConsumerTag, CTags)
- end,
- gb_sets:singleton(ConsumerTag), QCons),
- State#ch{queue_monitors = pmon:monitor(QPid, QMons),
- queue_consumers = QCons1}.
-
-monitor_delivering_queue(NoAck, QPid, QName,
- State = #ch{queue_names = QNames,
- queue_monitors = QMons,
- delivering_queues = DQ}) ->
- State#ch{queue_names = dict:store(QPid, QName, QNames),
- queue_monitors = pmon:monitor(QPid, QMons),
- delivering_queues = case NoAck of
- true -> DQ;
- false -> sets:add_element(QPid, DQ)
- end}.
-
-handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC,
- mandatory = Mand}) ->
- {MMsgs, Mand1} = dtree:take(QPid, Mand),
- [basic_return(Msg, State, no_route) || {_, Msg} <- MMsgs],
- State1 = State#ch{mandatory = Mand1},
- case rabbit_misc:is_abnormal_exit(Reason) of
- true -> {MXs, UC1} = dtree:take_all(QPid, UC),
- send_nacks(MXs, State1#ch{unconfirmed = UC1});
- false -> {MXs, UC1} = dtree:take(QPid, UC),
- record_confirms(MXs, State1#ch{unconfirmed = UC1})
-
- end.
-
-handle_consuming_queue_down(QPid, State = #ch{queue_consumers = QCons,
- queue_names = QNames}) ->
- ConsumerTags = case dict:find(QPid, QCons) of
- error -> gb_sets:new();
- {ok, CTags} -> CTags
- end,
- gb_sets:fold(
- fun (CTag, StateN = #ch{consumer_mapping = CMap}) ->
- QName = dict:fetch(QPid, QNames),
- case queue_down_consumer_action(CTag, CMap) of
- remove ->
- cancel_consumer(CTag, QName, StateN);
- {recover, {NoAck, ConsumerPrefetch, Exclusive, Args}} ->
- case catch basic_consume( %% [0]
- QName, NoAck, ConsumerPrefetch, CTag,
- Exclusive, Args, true, StateN) of
- {ok, StateN1} -> StateN1;
- _ -> cancel_consumer(CTag, QName, StateN)
- end
- end
- end, State#ch{queue_consumers = dict:erase(QPid, QCons)}, ConsumerTags).
-
-%% [0] There is a slight danger here that if a queue is deleted and
-%% then recreated again the reconsume will succeed even though it was
-%% not an HA failover. But the likelihood is not great and most users
-%% are unlikely to care.
-
-cancel_consumer(CTag, QName, State = #ch{capabilities = Capabilities,
- consumer_mapping = CMap}) ->
- case rabbit_misc:table_lookup(
- Capabilities, <<"consumer_cancel_notify">>) of
- {bool, true} -> ok = send(#'basic.cancel'{consumer_tag = CTag,
- nowait = true}, State);
- _ -> ok
- end,
- rabbit_event:notify(consumer_deleted, [{consumer_tag, CTag},
- {channel, self()},
- {queue, QName}]),
- State#ch{consumer_mapping = dict:erase(CTag, CMap)}.
-
-queue_down_consumer_action(CTag, CMap) ->
- {_, {_, _, _, Args} = ConsumeSpec} = dict:fetch(CTag, CMap),
- case rabbit_misc:table_lookup(Args, <<"x-cancel-on-ha-failover">>) of
- {bool, true} -> remove;
- _ -> {recover, ConsumeSpec}
- end.
-
-handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
- State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
-
-binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
- RoutingKey, Arguments, ReturnMethod, NoWait,
- State = #ch{virtual_host = VHostPath,
- conn_pid = ConnPid }) ->
- DestinationName = name_to_resource(DestinationType, DestinationNameBin, State),
- check_write_permitted(DestinationName, State),
- ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- [check_not_default_exchange(N) || N <- [DestinationName, ExchangeName]],
- check_read_permitted(ExchangeName, State),
- case Fun(#binding{source = ExchangeName,
- destination = DestinationName,
- key = RoutingKey,
- args = Arguments},
- fun (_X, Q = #amqqueue{}) ->
- try rabbit_amqqueue:check_exclusive_access(Q, ConnPid)
- catch exit:Reason -> {error, Reason}
- end;
- (_X, #exchange{}) ->
- ok
- end) of
- {error, {resources_missing, [{not_found, Name} | _]}} ->
- rabbit_misc:not_found(Name);
- {error, {resources_missing, [{absent, Q, Reason} | _]}} ->
- rabbit_misc:absent(Q, Reason);
- {error, binding_not_found} ->
- rabbit_misc:protocol_error(
- not_found, "no binding ~s between ~s and ~s",
- [RoutingKey, rabbit_misc:rs(ExchangeName),
- rabbit_misc:rs(DestinationName)]);
- {error, {binding_invalid, Fmt, Args}} ->
- rabbit_misc:protocol_error(precondition_failed, Fmt, Args);
- {error, #amqp_error{} = Error} ->
- rabbit_misc:protocol_error(Error);
- ok -> return_ok(State, NoWait, ReturnMethod)
- end.
-
-basic_return(#basic_message{exchange_name = ExchangeName,
- routing_keys = [RoutingKey | _CcRoutes],
- content = Content},
- State = #ch{protocol = Protocol, writer_pid = WriterPid},
- Reason) ->
- ?INCR_STATS([{exchange_stats, ExchangeName, 1}], return_unroutable, State),
- {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason),
- ok = rabbit_writer:send_command(
- WriterPid,
- #'basic.return'{reply_code = ReplyCode,
- reply_text = ReplyText,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey},
- Content).
-
-reject(DeliveryTag, Requeue, Multiple,
- State = #ch{unacked_message_q = UAMQ, tx = Tx}) ->
- {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
- State1 = State#ch{unacked_message_q = Remaining},
- {noreply, case Tx of
- none -> reject(Requeue, Acked, State1#ch.limiter),
- State1;
- {Msgs, Acks} -> Acks1 = ack_cons(Requeue, Acked, Acks),
- State1#ch{tx = {Msgs, Acks1}}
- end}.
-
-%% NB: Acked is in youngest-first order
-reject(Requeue, Acked, Limiter) ->
- foreach_per_queue(
- fun (QPid, MsgIds) ->
- rabbit_amqqueue:reject(QPid, Requeue, MsgIds, self())
- end, Acked),
- ok = notify_limiter(Limiter, Acked).
-
-record_sent(ConsumerTag, AckRequired,
- Msg = {QName, QPid, MsgId, Redelivered, _Message},
- State = #ch{unacked_message_q = UAMQ,
- next_tag = DeliveryTag,
- trace_state = TraceState,
- user = #user{username = Username},
- conn_name = ConnName,
- channel = ChannelNum}) ->
- ?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of
- {none, true} -> get;
- {none, false} -> get_no_ack;
- {_ , true} -> deliver;
- {_ , false} -> deliver_no_ack
- end, State),
- case Redelivered of
- true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State);
- false -> ok
- end,
- rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState),
- UAMQ1 = case AckRequired of
- true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
- UAMQ);
- false -> UAMQ
- end,
- State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
-
-%% NB: returns acks in youngest-first order
-collect_acks(Q, 0, true) ->
- {lists:reverse(queue:to_list(Q)), queue:new()};
-collect_acks(Q, DeliveryTag, Multiple) ->
- collect_acks([], [], Q, DeliveryTag, Multiple).
-
-collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
- case queue:out(Q) of
- {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
- QTail} ->
- if CurrentDeliveryTag == DeliveryTag ->
- {[UnackedMsg | ToAcc],
- case PrefixAcc of
- [] -> QTail;
- _ -> queue:join(
- queue:from_list(lists:reverse(PrefixAcc)),
- QTail)
- end};
- Multiple ->
- collect_acks([UnackedMsg | ToAcc], PrefixAcc,
- QTail, DeliveryTag, Multiple);
- true ->
- collect_acks(ToAcc, [UnackedMsg | PrefixAcc],
- QTail, DeliveryTag, Multiple)
- end;
- {empty, _} ->
- precondition_failed("unknown delivery tag ~w", [DeliveryTag])
- end.
-
-%% NB: Acked is in youngest-first order
-ack(Acked, State = #ch{queue_names = QNames}) ->
- foreach_per_queue(
- fun (QPid, MsgIds) ->
- ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
- ?INCR_STATS(case dict:find(QPid, QNames) of
- {ok, QName} -> Count = length(MsgIds),
- [{queue_stats, QName, Count}];
- error -> []
- end, ack, State)
- end, Acked),
- ok = notify_limiter(State#ch.limiter, Acked).
-
-%% {Msgs, Acks}
-%%
-%% Msgs is a queue.
-%%
-%% Acks looks s.t. like this:
-%% [{false,[5,4]},{true,[3]},{ack,[2,1]}, ...]
-%%
-%% Each element is a pair consisting of a tag and a list of
-%% ack'ed/reject'ed msg ids. The tag is one of 'ack' (to ack), 'true'
-%% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as
-%% well as the list overall, are in "most-recent (generally youngest)
-%% ack first" order.
-new_tx() -> {queue:new(), []}.
-
-notify_queues(State = #ch{state = closing}) ->
- {ok, State};
-notify_queues(State = #ch{consumer_mapping = Consumers,
- delivering_queues = DQ }) ->
- QPids = sets:to_list(
- sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
- {rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}.
-
-foreach_per_queue(_F, []) ->
- ok;
-foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
- F(QPid, [MsgId]);
-%% NB: UAL should be in youngest-first order; the tree values will
-%% then be in oldest-first order
-foreach_per_queue(F, UAL) ->
- T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
- rabbit_misc:gb_trees_cons(QPid, MsgId, T)
- end, gb_trees:empty(), UAL),
- rabbit_misc:gb_trees_foreach(F, T).
-
-consumer_queues(Consumers) ->
- lists:usort([QPid || {_Key, {#amqqueue{pid = QPid}, _CParams}}
- <- dict:to_list(Consumers)]).
-
-%% tell the limiter about the number of acks that have been received
-%% for messages delivered to subscribed consumers, but not acks for
-%% messages sent in a response to a basic.get (identified by their
-%% 'none' consumer tag)
-notify_limiter(Limiter, Acked) ->
- %% optimisation: avoid the potentially expensive 'foldl' in the
- %% common case.
- case rabbit_limiter:is_active(Limiter) of
- false -> ok;
- true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
- ({_, _, _}, Acc) -> Acc + 1
- end, 0, Acked) of
- 0 -> ok;
- Count -> rabbit_limiter:ack(Limiter, Count)
- end
- end.
-
-deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
- confirm = false,
- mandatory = false},
- []}, State) -> %% optimisation
- ?INCR_STATS([{exchange_stats, XName, 1}], publish, State),
- State;
-deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
- exchange_name = XName},
- mandatory = Mandatory,
- confirm = Confirm,
- msg_seq_no = MsgSeqNo},
- DelQNames}, State = #ch{queue_names = QNames,
- queue_monitors = QMons}) ->
- Qs = rabbit_amqqueue:lookup(DelQNames),
- DeliveredQPids = rabbit_amqqueue:deliver_flow(Qs, Delivery),
- %% The pmon:monitor_all/2 monitors all queues to which we
- %% delivered. But we want to monitor even queues we didn't deliver
- %% to, since we need their 'DOWN' messages to clean
- %% queue_names. So we also need to monitor each QPid from
- %% queues. But that only gets the masters (which is fine for
- %% cleaning queue_names), so we need the union of both.
- %%
- %% ...and we need to add even non-delivered queues to queue_names
- %% since alternative algorithms to update queue_names less
- %% frequently would in fact be more expensive in the common case.
- {QNames1, QMons1} =
- lists:foldl(fun (#amqqueue{pid = QPid, name = QName},
- {QNames0, QMons0}) ->
- {case dict:is_key(QPid, QNames0) of
- true -> QNames0;
- false -> dict:store(QPid, QName, QNames0)
- end, pmon:monitor(QPid, QMons0)}
- end, {QNames, pmon:monitor_all(DeliveredQPids, QMons)}, Qs),
- State1 = State#ch{queue_names = QNames1,
- queue_monitors = QMons1},
- %% NB: the order here is important since basic.returns must be
- %% sent before confirms.
- State2 = process_routing_mandatory(Mandatory, DeliveredQPids, MsgSeqNo,
- Message, State1),
- State3 = process_routing_confirm( Confirm, DeliveredQPids, MsgSeqNo,
- XName, State2),
- ?INCR_STATS([{exchange_stats, XName, 1} |
- [{queue_exchange_stats, {QName, XName}, 1} ||
- QPid <- DeliveredQPids,
- {ok, QName} <- [dict:find(QPid, QNames1)]]],
- publish, State3),
- State3.
-
-process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) ->
- State;
-process_routing_mandatory(true, [], _MsgSeqNo, Msg, State) ->
- ok = basic_return(Msg, State, no_route),
- State;
-process_routing_mandatory(true, QPids, MsgSeqNo, Msg, State) ->
- State#ch{mandatory = dtree:insert(MsgSeqNo, QPids, Msg,
- State#ch.mandatory)}.
-
-process_routing_confirm(false, _, _MsgSeqNo, _XName, State) ->
- State;
-process_routing_confirm(true, [], MsgSeqNo, XName, State) ->
- record_confirms([{MsgSeqNo, XName}], State);
-process_routing_confirm(true, QPids, MsgSeqNo, XName, State) ->
- State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
- State#ch.unconfirmed)}.
-
-send_nacks([], State) ->
- State;
-send_nacks(_MXs, State = #ch{state = closing,
- tx = none}) -> %% optimisation
- State;
-send_nacks(MXs, State = #ch{tx = none}) ->
- coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs],
- fun(MsgSeqNo, Multiple) ->
- #'basic.nack'{delivery_tag = MsgSeqNo,
- multiple = Multiple}
- end, State);
-send_nacks(_MXs, State = #ch{state = closing}) -> %% optimisation
- State#ch{tx = failed};
-send_nacks(_, State) ->
- maybe_complete_tx(State#ch{tx = failed}).
-
-send_confirms(State = #ch{tx = none, confirmed = []}) ->
- State;
-send_confirms(State = #ch{tx = none, confirmed = C}) ->
- case rabbit_node_monitor:pause_minority_guard() of
- ok -> MsgSeqNos =
- lists:foldl(
- fun ({MsgSeqNo, XName}, MSNs) ->
- ?INCR_STATS([{exchange_stats, XName, 1}],
- confirm, State),
- [MsgSeqNo | MSNs]
- end, [], lists:append(C)),
- send_confirms(MsgSeqNos, State#ch{confirmed = []});
- pausing -> State
- end;
-send_confirms(State) ->
- case rabbit_node_monitor:pause_minority_guard() of
- ok -> maybe_complete_tx(State);
- pausing -> State
- end.
-
-send_confirms([], State) ->
- State;
-send_confirms(_Cs, State = #ch{state = closing}) -> %% optimisation
- State;
-send_confirms([MsgSeqNo], State) ->
- ok = send(#'basic.ack'{delivery_tag = MsgSeqNo}, State),
- State;
-send_confirms(Cs, State) ->
- coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) ->
- #'basic.ack'{delivery_tag = MsgSeqNo,
- multiple = Multiple}
- end, State).
-
-coalesce_and_send(MsgSeqNos, MkMsgFun, State = #ch{unconfirmed = UC}) ->
- SMsgSeqNos = lists:usort(MsgSeqNos),
- CutOff = case dtree:is_empty(UC) of
- true -> lists:last(SMsgSeqNos) + 1;
- false -> {SeqNo, _XName} = dtree:smallest(UC), SeqNo
- end,
- {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
- case Ms of
- [] -> ok;
- _ -> ok = send(MkMsgFun(lists:last(Ms), true), State)
- end,
- [ok = send(MkMsgFun(SeqNo, false), State) || SeqNo <- Ss],
- State.
-
-ack_cons(Tag, Acked, [{Tag, Acks} | L]) -> [{Tag, Acked ++ Acks} | L];
-ack_cons(Tag, Acked, Acks) -> [{Tag, Acked} | Acks].
-
-ack_len(Acks) -> lists:sum([length(L) || {ack, L} <- Acks]).
-
-maybe_complete_tx(State = #ch{tx = {_, _}}) ->
- State;
-maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
- case dtree:is_empty(UC) of
- false -> State;
- true -> complete_tx(State#ch{confirmed = []})
- end.
-
-complete_tx(State = #ch{tx = committing}) ->
- ok = send(#'tx.commit_ok'{}, State),
- State#ch{tx = new_tx()};
-complete_tx(State = #ch{tx = failed}) ->
- {noreply, State1} = handle_exception(
- rabbit_misc:amqp_error(
- precondition_failed, "partial tx completion", [],
- 'tx.commit'),
- State),
- State1#ch{tx = new_tx()}.
-
-infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
-
-i(pid, _) -> self();
-i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
-i(number, #ch{channel = Channel}) -> Channel;
-i(user, #ch{user = User}) -> User#user.username;
-i(vhost, #ch{virtual_host = VHost}) -> VHost;
-i(transactional, #ch{tx = Tx}) -> Tx =/= none;
-i(confirm, #ch{confirm_enabled = CE}) -> CE;
-i(name, State) -> name(State);
-i(consumer_count, #ch{consumer_mapping = CM}) -> dict:size(CM);
-i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC);
-i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ);
-i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs);
-i(messages_uncommitted, #ch{}) -> 0;
-i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
-i(acks_uncommitted, #ch{}) -> 0;
-i(state, #ch{state = running}) -> credit_flow:state();
-i(state, #ch{state = State}) -> State;
-i(prefetch_count, #ch{consumer_prefetch = C}) -> C;
-i(global_prefetch_count, #ch{limiter = Limiter}) ->
- rabbit_limiter:get_prefetch_limit(Limiter);
-i(Item, _) ->
- throw({bad_argument, Item}).
-
-name(#ch{conn_name = ConnName, channel = Channel}) ->
- list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])).
-
-incr_stats(Incs, Measure) ->
- [update_measures(Type, Key, Inc, Measure) || {Type, Key, Inc} <- Incs].
-
-update_measures(Type, Key, Inc, Measure) ->
- Measures = case get({Type, Key}) of
- undefined -> [];
- D -> D
- end,
- Cur = case orddict:find(Measure, Measures) of
- error -> 0;
- {ok, C} -> C
- end,
- put({Type, Key}, orddict:store(Measure, Cur + Inc, Measures)).
-
-emit_stats(State) -> emit_stats(State, []).
-
-emit_stats(State, Extra) ->
- Coarse = infos(?STATISTICS_KEYS, State),
- case rabbit_event:stats_level(State, #ch.stats_timer) of
- coarse -> rabbit_event:notify(channel_stats, Extra ++ Coarse);
- fine -> Fine = [{channel_queue_stats,
- [{QName, Stats} ||
- {{queue_stats, QName}, Stats} <- get()]},
- {channel_exchange_stats,
- [{XName, Stats} ||
- {{exchange_stats, XName}, Stats} <- get()]},
- {channel_queue_exchange_stats,
- [{QX, Stats} ||
- {{queue_exchange_stats, QX}, Stats} <- get()]}],
- rabbit_event:notify(channel_stats, Extra ++ Coarse ++ Fine)
- end.
-
-erase_queue_stats(QName) ->
- erase({queue_stats, QName}),
- [erase({queue_exchange_stats, QX}) ||
- {{queue_exchange_stats, QX = {QName0, _}}, _} <- get(),
- QName0 =:= QName].