summaryrefslogtreecommitdiff
path: root/deps/amqp_client/src/amqp_channel.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/amqp_client/src/amqp_channel.erl')
-rw-r--r--deps/amqp_client/src/amqp_channel.erl1010
1 files changed, 1010 insertions, 0 deletions
diff --git a/deps/amqp_client/src/amqp_channel.erl b/deps/amqp_client/src/amqp_channel.erl
new file mode 100644
index 0000000000..9e95df4fe3
--- /dev/null
+++ b/deps/amqp_client/src/amqp_channel.erl
@@ -0,0 +1,1010 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+%% @type close_reason(Type) = {shutdown, amqp_reason(Type)}.
+%% @type amqp_reason(Type) = {Type, Code, Text}
+%% Code = non_neg_integer()
+%% Text = binary().
+%% @doc This module encapsulates the client's view of an AMQP
+%% channel. Each server side channel is represented by an amqp_channel
+%% process on the client side. Channel processes are created using the
+%% {@link amqp_connection} module. Channel processes are supervised
+%% under amqp_client's supervision tree.<br/>
+%% <br/>
+%% In case of a failure or an AMQP error, the channel process exits with a
+%% meaningful exit reason:<br/>
+%% <br/>
+%% <table>
+%% <tr>
+%% <td><strong>Cause</strong></td>
+%% <td><strong>Exit reason</strong></td>
+%% </tr>
+%% <tr>
+%% <td>Any reason, where Code would have been 200 otherwise</td>
+%% <td>```normal'''</td>
+%% </tr>
+%% <tr>
+%% <td>User application calls amqp_channel:close/3</td>
+%% <td>```close_reason(app_initiated_close)'''</td>
+%% </tr>
+%% <tr>
+%% <td>Server closes channel (soft error)</td>
+%% <td>```close_reason(server_initiated_close)'''</td>
+%% </tr>
+%% <tr>
+%% <td>Server misbehaved (did not follow protocol)</td>
+%% <td>```close_reason(server_misbehaved)'''</td>
+%% </tr>
+%% <tr>
+%% <td>Connection is closing (causing all channels to cleanup and
+%% close)</td>
+%% <td>```{shutdown, {connection_closing, amqp_reason(atom())}}'''</td>
+%% </tr>
+%% <tr>
+%% <td>Other error</td>
+%% <td>(various error reasons, causing more detailed logging)</td>
+%% </tr>
+%% </table>
+%% <br/>
+%% See type definitions below.
+-module(amqp_channel).
+
+-include("amqp_client_internal.hrl").
+
+-behaviour(gen_server).
+
+-export([call/2, call/3, cast/2, cast/3, cast_flow/3]).
+-export([close/1, close/3]).
+-export([register_return_handler/2, unregister_return_handler/1,
+ register_flow_handler/2, unregister_flow_handler/1,
+ register_confirm_handler/2, unregister_confirm_handler/1]).
+-export([call_consumer/2, subscribe/3]).
+-export([next_publish_seqno/1, wait_for_confirms/1, wait_for_confirms/2,
+ wait_for_confirms_or_die/1, wait_for_confirms_or_die/2]).
+-export([start_link/5, set_writer/2, connection_closing/3, open/1,
+ enable_delivery_flow_control/1, notify_received/1]).
+
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2]).
+
+-define(TIMEOUT_FLUSH, 60000).
+
+-record(state, {number,
+ connection,
+ consumer,
+ driver,
+ rpc_requests = queue:new(),
+ closing = false, %% false |
+ %% {just_channel, Reason} |
+ %% {connection, Reason}
+ writer,
+ return_handler = none,
+ confirm_handler = none,
+ next_pub_seqno = 0,
+ flow_active = true,
+ flow_handler = none,
+ unconfirmed_set = gb_sets:new(),
+ waiting_set = gb_trees:empty(),
+ only_acks_received = true,
+
+ %% true | false, only relevant in the direct
+ %% client case.
+ %% when true, consumers will manually notify
+ %% queue pids using rabbit_amqqueue_common:notify_sent/2
+ %% to prevent the queue from overwhelming slow
+ %% consumers that use automatic acknowledgement
+ %% mode.
+ delivery_flow_control = false
+ }).
+
+%%---------------------------------------------------------------------------
+%% Type Definitions
+%%---------------------------------------------------------------------------
+
+%% @type amqp_method().
+%% This abstract datatype represents the set of methods that comprise
+%% the AMQP execution model. As indicated in the overview, the
+%% attributes of each method in the execution model are described in
+%% the protocol documentation. The Erlang record definitions are
+%% autogenerated from a parseable version of the specification. Most
+%% fields in the generated records have sensible default values that
+%% you need not worry in the case of a simple usage of the client
+%% library.
+
+%% @type amqp_msg() = #amqp_msg{}.
+%% This is the content encapsulated in content-bearing AMQP methods. It
+%% contains the following fields:
+%% <ul>
+%% <li>props :: class_property() - A class property record, defaults to
+%% #'P_basic'{}</li>
+%% <li>payload :: binary() - The arbitrary data payload</li>
+%% </ul>
+
+%%---------------------------------------------------------------------------
+%% AMQP Channel API methods
+%%---------------------------------------------------------------------------
+
+%% @spec (Channel, Method) -> Result
+%% @doc This is equivalent to amqp_channel:call(Channel, Method, none).
+call(Channel, Method) ->
+ gen_server:call(Channel, {call, Method, none, self()}, amqp_util:call_timeout()).
+
+%% @spec (Channel, Method, Content) -> Result
+%% where
+%% Channel = pid()
+%% Method = amqp_method()
+%% Content = amqp_msg() | none
+%% Result = amqp_method() | ok | blocked | closing
+%% @doc This sends an AMQP method on the channel.
+%% For content bearing methods, Content has to be an amqp_msg(), whereas
+%% for non-content bearing methods, it needs to be the atom 'none'.<br/>
+%% In the case of synchronous methods, this function blocks until the
+%% corresponding reply comes back from the server and returns it.
+%% In the case of asynchronous methods, the function blocks until the method
+%% gets sent on the wire and returns the atom 'ok' on success.<br/>
+%% This will return the atom 'blocked' if the server has
+%% throttled the client for flow control reasons. This will return the
+%% atom 'closing' if the channel is in the process of shutting down.<br/>
+%% Note that for asynchronous methods, the synchronicity implied by
+%% 'call' only means that the client has transmitted the method to
+%% the broker. It does not necessarily imply that the broker has
+%% accepted responsibility for the message.
+call(Channel, Method, Content) ->
+ gen_server:call(Channel, {call, Method, Content, self()}, amqp_util:call_timeout()).
+
+%% @spec (Channel, Method) -> ok
+%% @doc This is equivalent to amqp_channel:cast(Channel, Method, none).
+cast(Channel, Method) ->
+ gen_server:cast(Channel, {cast, Method, none, self(), noflow}).
+
+%% @spec (Channel, Method, Content) -> ok
+%% where
+%% Channel = pid()
+%% Method = amqp_method()
+%% Content = amqp_msg() | none
+%% @doc This function is the same as {@link call/3}, except that it returns
+%% immediately with the atom 'ok', without blocking the caller process.
+%% This function is not recommended with synchronous methods, since there is no
+%% way to verify that the server has received the method.
+cast(Channel, Method, Content) ->
+ gen_server:cast(Channel, {cast, Method, Content, self(), noflow}).
+
+%% @spec (Channel, Method, Content) -> ok
+%% where
+%% Channel = pid()
+%% Method = amqp_method()
+%% Content = amqp_msg() | none
+%% @doc Like cast/3, with flow control.
+cast_flow(Channel, Method, Content) ->
+ credit_flow:send(Channel),
+ gen_server:cast(Channel, {cast, Method, Content, self(), flow}).
+
+%% @spec (Channel) -> ok | closing
+%% where
+%% Channel = pid()
+%% @doc Closes the channel, invokes
+%% close(Channel, 200, &lt;&lt;"Goodbye"&gt;&gt;).
+close(Channel) ->
+ close(Channel, 200, <<"Goodbye">>).
+
+%% @spec (Channel, Code, Text) -> ok | closing
+%% where
+%% Channel = pid()
+%% Code = integer()
+%% Text = binary()
+%% @doc Closes the channel, allowing the caller to supply a reply code and
+%% text. If the channel is already closing, the atom 'closing' is returned.
+close(Channel, Code, Text) ->
+ gen_server:call(Channel, {close, Code, Text}, amqp_util:call_timeout()).
+
+%% @spec (Channel) -> integer()
+%% where
+%% Channel = pid()
+%% @doc When in confirm mode, returns the sequence number of the next
+%% message to be published.
+next_publish_seqno(Channel) ->
+ gen_server:call(Channel, next_publish_seqno, amqp_util:call_timeout()).
+
+%% @spec (Channel) -> boolean() | 'timeout'
+%% where
+%% Channel = pid()
+%% @doc Wait until all messages published since the last call have
+%% been either ack'd or nack'd by the broker. Note, when called on a
+%% non-Confirm channel, waitForConfirms returns an error.
+%% @param Channel: the channel on which to wait.
+%% @end
+wait_for_confirms(Channel) ->
+ wait_for_confirms(Channel, ?WAIT_FOR_CONFIRMS_TIMEOUT).
+
+%% @spec (Channel, Timeout) -> boolean() | 'timeout'
+%% where
+%% Channel = pid()
+%% Timeout = non_neg_integer() | {non_neg_integer(), second | millisecond} | 'infinity'
+%% @doc Wait until all messages published since the last call have
+%% been either ack'd or nack'd by the broker or the timeout expires.
+%% Note, when called on a non-Confirm channel, waitForConfirms throws
+%% an exception.
+%% @param Channel: the channel on which to wait.
+%% @param Timeout: the wait timeout in seconds.
+%% @end
+wait_for_confirms(Channel, {Timeout, second}) ->
+ do_wait_for_confirms(Channel, second_to_millisecond(Timeout));
+wait_for_confirms(Channel, {Timeout, millisecond}) ->
+ do_wait_for_confirms(Channel, Timeout);
+wait_for_confirms(Channel, Timeout) ->
+ do_wait_for_confirms(Channel, second_to_millisecond(Timeout)).
+
+%% @spec (Channel) -> true
+%% where
+%% Channel = pid()
+%% @doc Behaves the same as wait_for_confirms/1, but if a nack is
+%% received, the calling process is immediately sent an
+%% exit(nack_received).
+%% @param Channel: the channel on which to wait.
+%% @end
+wait_for_confirms_or_die(Channel) ->
+ wait_for_confirms_or_die(Channel, ?WAIT_FOR_CONFIRMS_TIMEOUT).
+
+%% @spec (Channel, Timeout) -> true
+%% where
+%% Channel = pid()
+%% Timeout = non_neg_integer() | {non_neg_integer(), second | millisecond} | 'infinity'
+%% @doc Behaves the same as wait_for_confirms/1, but if a nack is
+%% received, the calling process is immediately sent an
+%% exit(nack_received). If the timeout expires, the calling process is
+%% sent an exit(timeout).
+%% @param Channel: the channel on which to wait.
+%% @param Timeout: the wait timeout in seconds.
+%% @end
+wait_for_confirms_or_die(Channel, Timeout) ->
+ case wait_for_confirms(Channel, Timeout) of
+ timeout -> close(Channel, 200, <<"Confirm Timeout">>),
+ exit(timeout);
+ false -> close(Channel, 200, <<"Nacks Received">>),
+ exit(nacks_received);
+ true -> true
+ end.
+
+%% @spec (Channel, ReturnHandler) -> ok
+%% where
+%% Channel = pid()
+%% ReturnHandler = pid()
+%% @doc This registers a handler to deal with returned messages. The
+%% registered process will receive #basic.return{} records.
+register_return_handler(Channel, ReturnHandler) ->
+ gen_server:cast(Channel, {register_return_handler, ReturnHandler} ).
+
+%% @spec (Channel) -> ok
+%% where
+%% Channel = pid()
+%% @doc Removes the return handler, if it exists. Does nothing if there is no
+%% such handler.
+unregister_return_handler(Channel) ->
+ gen_server:cast(Channel, unregister_return_handler).
+
+%% @spec (Channel, ConfirmHandler) -> ok
+%% where
+%% Channel = pid()
+%% ConfirmHandler = pid()
+
+%% @doc This registers a handler to deal with confirm-related
+%% messages. The registered process will receive #basic.ack{} and
+%% #basic.nack{} commands.
+register_confirm_handler(Channel, ConfirmHandler) ->
+ gen_server:cast(Channel, {register_confirm_handler, ConfirmHandler} ).
+
+%% @spec (Channel) -> ok
+%% where
+%% Channel = pid()
+%% @doc Removes the confirm handler, if it exists. Does nothing if there is no
+%% such handler.
+unregister_confirm_handler(Channel) ->
+ gen_server:cast(Channel, unregister_confirm_handler).
+
+%% @spec (Channel, FlowHandler) -> ok
+%% where
+%% Channel = pid()
+%% FlowHandler = pid()
+%% @doc This registers a handler to deal with channel flow notifications.
+%% The registered process will receive #channel.flow{} records.
+register_flow_handler(Channel, FlowHandler) ->
+ gen_server:cast(Channel, {register_flow_handler, FlowHandler} ).
+
+%% @spec (Channel) -> ok
+%% where
+%% Channel = pid()
+%% @doc Removes the flow handler, if it exists. Does nothing if there is no
+%% such handler.
+unregister_flow_handler(Channel) ->
+ gen_server:cast(Channel, unregister_flow_handler).
+
+%% @spec (Channel, Msg) -> ok
+%% where
+%% Channel = pid()
+%% Msg = any()
+%% @doc This causes the channel to invoke Consumer:handle_call/2,
+%% where Consumer is the amqp_gen_consumer implementation registered with
+%% the channel.
+call_consumer(Channel, Msg) ->
+ gen_server:call(Channel, {call_consumer, Msg}, amqp_util:call_timeout()).
+
+%% @spec (Channel, BasicConsume, Subscriber) -> ok
+%% where
+%% Channel = pid()
+%% BasicConsume = amqp_method()
+%% Subscriber = pid()
+%% @doc Subscribe the given pid to a queue using the specified
+%% basic.consume method.
+subscribe(Channel, BasicConsume = #'basic.consume'{}, Subscriber) ->
+ gen_server:call(Channel, {subscribe, BasicConsume, Subscriber}, amqp_util:call_timeout()).
+
+%%---------------------------------------------------------------------------
+%% Internal interface
+%%---------------------------------------------------------------------------
+
+%% @private
+start_link(Driver, Connection, ChannelNumber, Consumer, Identity) ->
+ gen_server:start_link(
+ ?MODULE, [Driver, Connection, ChannelNumber, Consumer, Identity], []).
+
+set_writer(Pid, Writer) ->
+ gen_server:cast(Pid, {set_writer, Writer}).
+
+enable_delivery_flow_control(Pid) ->
+ gen_server:cast(Pid, enable_delivery_flow_control).
+
+notify_received({Pid, QPid, ServerChPid}) ->
+ gen_server:cast(Pid, {send_notify, {QPid, ServerChPid}}).
+
+%% @private
+connection_closing(Pid, ChannelCloseType, Reason) ->
+ gen_server:cast(Pid, {connection_closing, ChannelCloseType, Reason}).
+
+%% @private
+open(Pid) ->
+ gen_server:call(Pid, open, amqp_util:call_timeout()).
+
+%%---------------------------------------------------------------------------
+%% gen_server callbacks
+%%---------------------------------------------------------------------------
+
+%% @private
+init([Driver, Connection, ChannelNumber, Consumer, Identity]) ->
+ ?store_proc_name(Identity),
+ {ok, #state{connection = Connection,
+ driver = Driver,
+ number = ChannelNumber,
+ consumer = Consumer}}.
+
+%% @private
+handle_call(open, From, State) ->
+ {noreply, rpc_top_half(#'channel.open'{}, none, From, none, noflow, State)};
+%% @private
+handle_call({close, Code, Text}, From, State) ->
+ handle_close(Code, Text, From, State);
+%% @private
+handle_call({call, Method, AmqpMsg, Sender}, From, State) ->
+ handle_method_to_server(Method, AmqpMsg, From, Sender, noflow, State);
+%% Handles the delivery of messages from a direct channel
+%% @private
+handle_call({send_command_sync, Method, Content}, From, State) ->
+ Ret = handle_method_from_server(Method, Content, State),
+ gen_server:reply(From, ok),
+ Ret;
+%% Handles the delivery of messages from a direct channel
+%% @private
+handle_call({send_command_sync, Method}, From, State) ->
+ Ret = handle_method_from_server(Method, none, State),
+ gen_server:reply(From, ok),
+ Ret;
+%% @private
+handle_call(next_publish_seqno, _From,
+ State = #state{next_pub_seqno = SeqNo}) ->
+ {reply, SeqNo, State};
+handle_call({wait_for_confirms, Timeout}, From, State) ->
+ handle_wait_for_confirms(From, Timeout, State);
+%% @private
+handle_call({call_consumer, Msg}, _From,
+ State = #state{consumer = Consumer}) ->
+ {reply, amqp_gen_consumer:call_consumer(Consumer, Msg), State};
+%% @private
+handle_call({subscribe, BasicConsume, Subscriber}, From, State) ->
+ handle_method_to_server(BasicConsume, none, From, Subscriber, noflow,
+ State).
+
+%% @private
+handle_cast({set_writer, Writer}, State = #state{driver = direct}) ->
+ link(Writer),
+ {noreply, State#state{writer = Writer}};
+handle_cast({set_writer, Writer}, State) ->
+ {noreply, State#state{writer = Writer}};
+%% @private
+handle_cast(enable_delivery_flow_control, State) ->
+ {noreply, State#state{delivery_flow_control = true}};
+%% @private
+handle_cast({send_notify, {QPid, ChPid}}, State) ->
+ rabbit_amqqueue_common:notify_sent(QPid, ChPid),
+ {noreply, State};
+%% @private
+handle_cast({cast, Method, AmqpMsg, Sender, noflow}, State) ->
+ handle_method_to_server(Method, AmqpMsg, none, Sender, noflow, State);
+handle_cast({cast, Method, AmqpMsg, Sender, flow}, State) ->
+ credit_flow:ack(Sender),
+ handle_method_to_server(Method, AmqpMsg, none, Sender, flow, State);
+%% @private
+handle_cast({register_return_handler, ReturnHandler}, State) ->
+ Ref = erlang:monitor(process, ReturnHandler),
+ {noreply, State#state{return_handler = {ReturnHandler, Ref}}};
+%% @private
+handle_cast(unregister_return_handler,
+ State = #state{return_handler = {_ReturnHandler, Ref}}) ->
+ erlang:demonitor(Ref),
+ {noreply, State#state{return_handler = none}};
+%% @private
+handle_cast({register_confirm_handler, ConfirmHandler}, State) ->
+ Ref = erlang:monitor(process, ConfirmHandler),
+ {noreply, State#state{confirm_handler = {ConfirmHandler, Ref}}};
+%% @private
+handle_cast(unregister_confirm_handler,
+ State = #state{confirm_handler = {_ConfirmHandler, Ref}}) ->
+ erlang:demonitor(Ref),
+ {noreply, State#state{confirm_handler = none}};
+%% @private
+handle_cast({register_flow_handler, FlowHandler}, State) ->
+ Ref = erlang:monitor(process, FlowHandler),
+ {noreply, State#state{flow_handler = {FlowHandler, Ref}}};
+%% @private
+handle_cast(unregister_flow_handler,
+ State = #state{flow_handler = {_FlowHandler, Ref}}) ->
+ erlang:demonitor(Ref),
+ {noreply, State#state{flow_handler = none}};
+%% Received from channels manager
+%% @private
+handle_cast({method, Method, Content, noflow}, State) ->
+ handle_method_from_server(Method, Content, State);
+%% Handles the situation when the connection closes without closing the channel
+%% beforehand. The channel must block all further RPCs,
+%% flush the RPC queue (optional), and terminate
+%% @private
+handle_cast({connection_closing, CloseType, Reason}, State) ->
+ handle_connection_closing(CloseType, Reason, State);
+%% @private
+handle_cast({shutdown, Shutdown}, State) ->
+ handle_shutdown(Shutdown, State).
+
+%% Received from rabbit_channel in the direct case
+%% @private
+handle_info({send_command, Method}, State) ->
+ handle_method_from_server(Method, none, State);
+%% Received from rabbit_channel in the direct case
+%% @private
+handle_info({send_command, Method, Content}, State) ->
+ handle_method_from_server(Method, Content, State);
+%% Received from rabbit_channel in the direct case
+%% @private
+handle_info({send_command_and_notify, QPid, ChPid,
+ Method = #'basic.deliver'{}, Content},
+ State = #state{delivery_flow_control = MFC}) ->
+ case MFC of
+ false -> handle_method_from_server(Method, Content, State),
+ rabbit_amqqueue_common:notify_sent(QPid, ChPid);
+ true -> handle_method_from_server(Method, Content,
+ {self(), QPid, ChPid}, State)
+ end,
+ {noreply, State};
+%% This comes from the writer or rabbit_channel
+%% @private
+handle_info({channel_exit, _ChNumber, Reason}, State) ->
+ handle_channel_exit(Reason, State);
+%% This comes from rabbit_channel in the direct case
+handle_info({channel_closing, ChPid}, State) ->
+ ok = rabbit_channel_common:ready_for_close(ChPid),
+ {noreply, State};
+%% @private
+handle_info({bump_credit, Msg}, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ {noreply, State};
+%% @private
+handle_info(timed_out_flushing_channel, State) ->
+ ?LOG_WARN("Channel (~p) closing: timed out flushing while "
+ "connection closing~n", [self()]),
+ {stop, timed_out_flushing_channel, State};
+%% @private
+handle_info({'DOWN', _, process, ReturnHandler, shutdown},
+ State = #state{return_handler = {ReturnHandler, _Ref}}) ->
+ {noreply, State#state{return_handler = none}};
+handle_info({'DOWN', _, process, ReturnHandler, Reason},
+ State = #state{return_handler = {ReturnHandler, _Ref}}) ->
+ ?LOG_WARN("Channel (~p): Unregistering return handler ~p because it died. "
+ "Reason: ~p~n", [self(), ReturnHandler, Reason]),
+ {noreply, State#state{return_handler = none}};
+%% @private
+handle_info({'DOWN', _, process, ConfirmHandler, shutdown},
+ State = #state{confirm_handler = {ConfirmHandler, _Ref}}) ->
+ {noreply, State#state{confirm_handler = none}};
+handle_info({'DOWN', _, process, ConfirmHandler, Reason},
+ State = #state{confirm_handler = {ConfirmHandler, _Ref}}) ->
+ ?LOG_WARN("Channel (~p): Unregistering confirm handler ~p because it died. "
+ "Reason: ~p~n", [self(), ConfirmHandler, Reason]),
+ {noreply, State#state{confirm_handler = none}};
+%% @private
+handle_info({'DOWN', _, process, FlowHandler, shutdown},
+ State = #state{flow_handler = {FlowHandler, _Ref}}) ->
+ {noreply, State#state{flow_handler = none}};
+handle_info({'DOWN', _, process, FlowHandler, Reason},
+ State = #state{flow_handler = {FlowHandler, _Ref}}) ->
+ ?LOG_WARN("Channel (~p): Unregistering flow handler ~p because it died. "
+ "Reason: ~p~n", [self(), FlowHandler, Reason]),
+ {noreply, State#state{flow_handler = none}};
+handle_info({'DOWN', _, process, QPid, _Reason}, State) ->
+ rabbit_amqqueue_common:notify_sent_queue_down(QPid),
+ {noreply, State};
+handle_info({confirm_timeout, From}, State = #state{waiting_set = WSet}) ->
+ case gb_trees:lookup(From, WSet) of
+ none ->
+ {noreply, State};
+ {value, _} ->
+ gen_server:reply(From, timeout),
+ {noreply, State#state{waiting_set = gb_trees:delete(From, WSet)}}
+ end.
+
+%% @private
+terminate(_Reason, State) ->
+ flush_writer(State),
+ State.
+
+%% @private
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%---------------------------------------------------------------------------
+%% RPC mechanism
+%%---------------------------------------------------------------------------
+
+handle_method_to_server(Method, AmqpMsg, From, Sender, Flow,
+ State = #state{unconfirmed_set = USet}) ->
+ case {check_invalid_method(Method), From,
+ check_block(Method, AmqpMsg, State)} of
+ {ok, _, ok} ->
+ State1 = case {Method, State#state.next_pub_seqno} of
+ {#'confirm.select'{}, 0} ->
+ %% The confirm seqno is set to 1 on the
+ %% first confirm.select only.
+ State#state{next_pub_seqno = 1};
+ {#'basic.publish'{}, 0} ->
+ State;
+ {#'basic.publish'{}, SeqNo} ->
+ State#state{unconfirmed_set =
+ gb_sets:add(SeqNo, USet),
+ next_pub_seqno = SeqNo + 1};
+ _ ->
+ State
+ end,
+ {noreply, rpc_top_half(Method, build_content(AmqpMsg),
+ From, Sender, Flow, State1)};
+ {ok, none, BlockReply} ->
+ ?LOG_WARN("Channel (~p): discarding method ~p in cast.~n"
+ "Reason: ~p~n", [self(), Method, BlockReply]),
+ {noreply, State};
+ {ok, _, BlockReply} ->
+ {reply, BlockReply, State};
+ {{_, InvalidMethodMessage}, none, _} ->
+ ?LOG_WARN("Channel (~p): ignoring cast of ~p method. " ++
+ InvalidMethodMessage ++ "~n", [self(), Method]),
+ {noreply, State};
+ {{InvalidMethodReply, _}, _, _} ->
+ {reply, {error, InvalidMethodReply}, State}
+ end.
+
+handle_close(Code, Text, From, State) ->
+ Close = #'channel.close'{reply_code = Code,
+ reply_text = Text,
+ class_id = 0,
+ method_id = 0},
+ case check_block(Close, none, State) of
+ ok -> {noreply, rpc_top_half(Close, none, From, none, noflow,
+ State)};
+ BlockReply -> {reply, BlockReply, State}
+ end.
+
+rpc_top_half(Method, Content, From, Sender, Flow,
+ State0 = #state{rpc_requests = RequestQueue}) ->
+ State1 = State0#state{
+ rpc_requests = queue:in({From, Sender, Method, Content, Flow},
+ RequestQueue)},
+ IsFirstElement = queue:is_empty(RequestQueue),
+ if IsFirstElement -> do_rpc(State1);
+ true -> State1
+ end.
+
+rpc_bottom_half(Reply, State = #state{rpc_requests = RequestQueue}) ->
+ {{value, {From, _Sender, _Method, _Content, _Flow}}, RequestQueue1} =
+ queue:out(RequestQueue),
+ case From of
+ none -> ok;
+ _ -> gen_server:reply(From, Reply)
+ end,
+ do_rpc(State#state{rpc_requests = RequestQueue1}).
+
+do_rpc(State = #state{rpc_requests = Q,
+ closing = Closing}) ->
+ case queue:out(Q) of
+ {{value, {From, Sender, Method, Content, Flow}}, NewQ} ->
+ State1 = pre_do(Method, Content, Sender, State),
+ DoRet = do(Method, Content, Flow, State1),
+ case ?PROTOCOL:is_method_synchronous(Method) of
+ true -> State1;
+ false -> case {From, DoRet} of
+ {none, _} -> ok;
+ {_, ok} -> gen_server:reply(From, ok);
+ _ -> ok
+ %% Do not reply if error in do. Expecting
+ %% {channel_exit, _, _}
+ end,
+ do_rpc(State1#state{rpc_requests = NewQ})
+ end;
+ {empty, NewQ} ->
+ case Closing of
+ {connection, Reason} ->
+ gen_server:cast(self(),
+ {shutdown, {connection_closing, Reason}});
+ _ ->
+ ok
+ end,
+ State#state{rpc_requests = NewQ}
+ end.
+
+pending_rpc_method(#state{rpc_requests = Q}) ->
+ {value, {_From, _Sender, Method, _Content, _Flow}} = queue:peek(Q),
+ Method.
+
+pre_do(#'channel.close'{reply_code = Code, reply_text = Text}, none,
+ _Sender, State) ->
+ State#state{closing = {just_channel, {app_initiated_close, Code, Text}}};
+pre_do(#'basic.consume'{} = Method, none, Sender, State) ->
+ ok = call_to_consumer(Method, Sender, State),
+ State;
+pre_do(#'basic.cancel'{} = Method, none, Sender, State) ->
+ ok = call_to_consumer(Method, Sender, State),
+ State;
+pre_do(_, _, _, State) ->
+ State.
+
+%%---------------------------------------------------------------------------
+%% Handling of methods from the server
+%%---------------------------------------------------------------------------
+
+safely_handle_method_from_server(Method, Content,
+ Continuation,
+ State = #state{closing = Closing}) ->
+ case is_connection_method(Method) of
+ true -> server_misbehaved(
+ #amqp_error{name = command_invalid,
+ explanation = "connection method on "
+ "non-zero channel",
+ method = element(1, Method)},
+ State);
+ false -> Drop = case {Closing, Method} of
+ {{just_channel, _}, #'channel.close'{}} -> false;
+ {{just_channel, _}, #'channel.close_ok'{}} -> false;
+ {{just_channel, _}, _} -> true;
+ _ -> false
+ end,
+ if Drop -> ?LOG_INFO("Channel (~p): dropping method ~p from "
+ "server because channel is closing~n",
+ [self(), {Method, Content}]),
+ {noreply, State};
+ true ->
+ Continuation()
+ end
+ end.
+
+handle_method_from_server(Method, Content, State) ->
+ Fun = fun () ->
+ handle_method_from_server1(Method,
+ amqp_msg(Content), State)
+ end,
+ safely_handle_method_from_server(Method, Content, Fun, State).
+
+handle_method_from_server(Method = #'basic.deliver'{},
+ Content, DeliveryCtx, State) ->
+ Fun = fun () ->
+ handle_method_from_server1(Method,
+ amqp_msg(Content),
+ DeliveryCtx,
+ State)
+ end,
+ safely_handle_method_from_server(Method, Content, Fun, State).
+
+handle_method_from_server1(#'channel.open_ok'{}, none, State) ->
+ {noreply, rpc_bottom_half(ok, State)};
+handle_method_from_server1(#'channel.close'{reply_code = Code,
+ reply_text = Text},
+ none,
+ State = #state{closing = {just_channel, _}}) ->
+ %% Both client and server sent close at the same time. Don't shutdown yet,
+ %% wait for close_ok.
+ do(#'channel.close_ok'{}, none, noflow, State),
+ {noreply,
+ State#state{
+ closing = {just_channel, {server_initiated_close, Code, Text}}}};
+handle_method_from_server1(#'channel.close'{reply_code = Code,
+ reply_text = Text}, none, State) ->
+ do(#'channel.close_ok'{}, none, noflow, State),
+ handle_shutdown({server_initiated_close, Code, Text}, State);
+handle_method_from_server1(#'channel.close_ok'{}, none,
+ State = #state{closing = Closing}) ->
+ case Closing of
+ {just_channel, {app_initiated_close, _, _} = Reason} ->
+ handle_shutdown(Reason, rpc_bottom_half(ok, State));
+ {just_channel, {server_initiated_close, _, _} = Reason} ->
+ handle_shutdown(Reason,
+ rpc_bottom_half(closing, State));
+ {connection, Reason} ->
+ handle_shutdown({connection_closing, Reason}, State)
+ end;
+handle_method_from_server1(#'basic.consume_ok'{} = ConsumeOk, none, State) ->
+ Consume = #'basic.consume'{} = pending_rpc_method(State),
+ ok = call_to_consumer(ConsumeOk, Consume, State),
+ {noreply, rpc_bottom_half(ConsumeOk, State)};
+handle_method_from_server1(#'basic.cancel_ok'{} = CancelOk, none, State) ->
+ Cancel = #'basic.cancel'{} = pending_rpc_method(State),
+ ok = call_to_consumer(CancelOk, Cancel, State),
+ {noreply, rpc_bottom_half(CancelOk, State)};
+handle_method_from_server1(#'basic.cancel'{} = Cancel, none, State) ->
+ ok = call_to_consumer(Cancel, none, State),
+ {noreply, State};
+handle_method_from_server1(#'basic.deliver'{} = Deliver, AmqpMsg, State) ->
+ ok = call_to_consumer(Deliver, AmqpMsg, State),
+ {noreply, State};
+handle_method_from_server1(#'channel.flow'{active = Active} = Flow, none,
+ State = #state{flow_handler = FlowHandler}) ->
+ case FlowHandler of none -> ok;
+ {Pid, _Ref} -> Pid ! Flow
+ end,
+ %% Putting the flow_ok in the queue so that the RPC queue can be
+ %% flushed beforehand. Methods that made it to the queue are not
+ %% blocked in any circumstance.
+ {noreply, rpc_top_half(#'channel.flow_ok'{active = Active}, none, none,
+ none, noflow, State#state{flow_active = Active})};
+handle_method_from_server1(
+ #'basic.return'{} = BasicReturn, AmqpMsg,
+ State = #state{return_handler = ReturnHandler}) ->
+ case ReturnHandler of
+ none -> ?LOG_WARN("Channel (~p): received {~p, ~p} but there is "
+ "no return handler registered~n",
+ [self(), BasicReturn, AmqpMsg]);
+ {Pid, _Ref} -> Pid ! {BasicReturn, AmqpMsg}
+ end,
+ {noreply, State};
+handle_method_from_server1(#'basic.ack'{} = BasicAck, none,
+ #state{confirm_handler = none} = State) ->
+ {noreply, update_confirm_set(BasicAck, State)};
+handle_method_from_server1(#'basic.ack'{} = BasicAck, none,
+ #state{confirm_handler = {CH, _Ref}} = State) ->
+ CH ! BasicAck,
+ {noreply, update_confirm_set(BasicAck, State)};
+handle_method_from_server1(#'basic.nack'{} = BasicNack, none,
+ #state{confirm_handler = none} = State) ->
+ ?LOG_WARN("Channel (~p): received ~p but there is no "
+ "confirm handler registered~n", [self(), BasicNack]),
+ {noreply, update_confirm_set(BasicNack, State)};
+handle_method_from_server1(#'basic.nack'{} = BasicNack, none,
+ #state{confirm_handler = {CH, _Ref}} = State) ->
+ CH ! BasicNack,
+ {noreply, update_confirm_set(BasicNack, State)};
+
+handle_method_from_server1(#'basic.credit_drained'{} = CreditDrained, none,
+ #state{consumer = Consumer} = State) ->
+ Consumer ! CreditDrained,
+ {noreply, State};
+handle_method_from_server1(Method, none, State) ->
+ {noreply, rpc_bottom_half(Method, State)};
+handle_method_from_server1(Method, Content, State) ->
+ {noreply, rpc_bottom_half({Method, Content}, State)}.
+
+%% only used with manual consumer-to-queue flow control
+handle_method_from_server1(#'basic.deliver'{} = Deliver, AmqpMsg,
+ DeliveryCtx, State) ->
+ ok = call_to_consumer(Deliver, AmqpMsg, DeliveryCtx, State),
+ {noreply, State}.
+
+%%---------------------------------------------------------------------------
+%% Other handle_* functions
+%%---------------------------------------------------------------------------
+
+handle_connection_closing(CloseType, Reason,
+ State = #state{rpc_requests = RpcQueue,
+ closing = Closing}) ->
+ NewState = State#state{closing = {connection, Reason}},
+ case {CloseType, Closing, queue:is_empty(RpcQueue)} of
+ {flush, false, false} ->
+ erlang:send_after(?TIMEOUT_FLUSH, self(),
+ timed_out_flushing_channel),
+ {noreply, NewState};
+ {flush, {just_channel, _}, false} ->
+ {noreply, NewState};
+ _ ->
+ handle_shutdown({connection_closing, Reason}, NewState)
+ end.
+
+handle_channel_exit(Reason = #amqp_error{name = ErrorName, explanation = Expl},
+ State = #state{connection = Connection, number = Number}) ->
+ %% Sent by rabbit_channel for hard errors in the direct case
+ ?LOG_ERR("connection ~p, channel ~p - error:~n~p~n",
+ [Connection, Number, Reason]),
+ {true, Code, _} = ?PROTOCOL:lookup_amqp_exception(ErrorName),
+ ReportedReason = {server_initiated_close, Code, Expl},
+ amqp_gen_connection:hard_error_in_channel(
+ Connection, self(), ReportedReason),
+ handle_shutdown({connection_closing, ReportedReason}, State);
+handle_channel_exit(Reason, State) ->
+ %% Unexpected death of a channel infrastructure process
+ {stop, {infrastructure_died, Reason}, State}.
+
+handle_shutdown({_, 200, _}, State) ->
+ {stop, normal, State};
+handle_shutdown({connection_closing, {_, 200, _}}, State) ->
+ {stop, normal, State};
+handle_shutdown({connection_closing, normal}, State) ->
+ {stop, normal, State};
+handle_shutdown(Reason, State) ->
+ {stop, {shutdown, Reason}, State}.
+
+%%---------------------------------------------------------------------------
+%% Internal plumbing
+%%---------------------------------------------------------------------------
+
+do(Method, Content, Flow, #state{driver = network, writer = W}) ->
+ %% Catching because it expects the {channel_exit, _, _} message on error
+ catch case {Content, Flow} of
+ {none, _} -> rabbit_writer:send_command(W, Method);
+ {_, flow} -> rabbit_writer:send_command_flow(W, Method,
+ Content);
+ {_, noflow} -> rabbit_writer:send_command(W, Method, Content)
+ end;
+do(Method, Content, Flow, #state{driver = direct, writer = W}) ->
+ %% ditto catching because...
+ catch case {Content, Flow} of
+ {none, _} -> rabbit_channel_common:do(W, Method);
+ {_, flow} -> rabbit_channel_common:do_flow(W, Method, Content);
+ {_, noflow} -> rabbit_channel_common:do(W, Method, Content)
+ end.
+
+
+flush_writer(#state{driver = network, writer = Writer}) ->
+ try
+ rabbit_writer:flush(Writer)
+ catch
+ exit:noproc -> ok
+ end;
+flush_writer(#state{driver = direct}) ->
+ ok.
+amqp_msg(none) ->
+ none;
+amqp_msg(Content) ->
+ {Props, Payload} = rabbit_basic_common:from_content(Content),
+ #amqp_msg{props = Props, payload = Payload}.
+
+build_content(none) ->
+ none;
+build_content(#amqp_msg{props = Props, payload = Payload}) ->
+ rabbit_basic_common:build_content(Props, Payload).
+
+check_block(_Method, _AmqpMsg, #state{closing = {just_channel, _}}) ->
+ closing;
+check_block(_Method, _AmqpMsg, #state{closing = {connection, _}}) ->
+ closing;
+check_block(_Method, none, #state{}) ->
+ ok;
+check_block(_Method, #amqp_msg{}, #state{flow_active = false}) ->
+ blocked;
+check_block(_Method, _AmqpMsg, #state{}) ->
+ ok.
+
+check_invalid_method(#'channel.open'{}) ->
+ {use_amqp_connection_module,
+ "Use amqp_connection:open_channel/{1,2} instead"};
+check_invalid_method(#'channel.close'{}) ->
+ {use_close_function, "Use close/{1,3} instead"};
+check_invalid_method(Method) ->
+ case is_connection_method(Method) of
+ true -> {connection_methods_not_allowed,
+ "Sending connection methods is not allowed"};
+ false -> ok
+ end.
+
+is_connection_method(Method) ->
+ {ClassId, _} = ?PROTOCOL:method_id(element(1, Method)),
+ ?PROTOCOL:lookup_class_name(ClassId) == connection.
+
+server_misbehaved(#amqp_error{} = AmqpError, State = #state{number = Number}) ->
+ case rabbit_binary_generator:map_exception(Number, AmqpError, ?PROTOCOL) of
+ {0, _} ->
+ handle_shutdown({server_misbehaved, AmqpError}, State);
+ {_, Close} ->
+ ?LOG_WARN("Channel (~p) flushing and closing due to soft "
+ "error caused by the server ~p~n", [self(), AmqpError]),
+ Self = self(),
+ spawn(fun () -> call(Self, Close) end),
+ {noreply, State}
+ end.
+
+update_confirm_set(#'basic.ack'{delivery_tag = SeqNo,
+ multiple = Multiple},
+ State = #state{unconfirmed_set = USet}) ->
+ maybe_notify_waiters(
+ State#state{unconfirmed_set =
+ update_unconfirmed(SeqNo, Multiple, USet)});
+update_confirm_set(#'basic.nack'{delivery_tag = SeqNo,
+ multiple = Multiple},
+ State = #state{unconfirmed_set = USet}) ->
+ maybe_notify_waiters(
+ State#state{unconfirmed_set = update_unconfirmed(SeqNo, Multiple, USet),
+ only_acks_received = false}).
+
+update_unconfirmed(SeqNo, false, USet) ->
+ gb_sets:del_element(SeqNo, USet);
+update_unconfirmed(SeqNo, true, USet) ->
+ case gb_sets:is_empty(USet) of
+ true -> USet;
+ false -> {S, USet1} = gb_sets:take_smallest(USet),
+ case S > SeqNo of
+ true -> USet;
+ false -> update_unconfirmed(SeqNo, true, USet1)
+ end
+ end.
+
+maybe_notify_waiters(State = #state{unconfirmed_set = USet}) ->
+ case gb_sets:is_empty(USet) of
+ false -> State;
+ true -> notify_confirm_waiters(State)
+ end.
+
+notify_confirm_waiters(State = #state{waiting_set = WSet,
+ only_acks_received = OAR}) ->
+ [begin
+ safe_cancel_timer(TRef),
+ gen_server:reply(From, OAR)
+ end || {From, TRef} <- gb_trees:to_list(WSet)],
+ State#state{waiting_set = gb_trees:empty(),
+ only_acks_received = true}.
+
+do_wait_for_confirms(Channel, Timeout) when is_integer(Timeout) ->
+ case gen_server:call(Channel, {wait_for_confirms, Timeout}, amqp_util:call_timeout()) of
+ {error, Reason} -> throw(Reason);
+ Other -> Other
+ end.
+
+handle_wait_for_confirms(_From, _Timeout, State = #state{next_pub_seqno = 0}) ->
+ {reply, {error, not_in_confirm_mode}, State};
+handle_wait_for_confirms(From, Timeout,
+ State = #state{unconfirmed_set = USet,
+ waiting_set = WSet}) ->
+ case gb_sets:is_empty(USet) of
+ true -> {reply, true, State};
+ false -> TRef = case Timeout of
+ infinity -> undefined;
+ _ -> erlang:send_after(
+ Timeout, self(),
+ {confirm_timeout, From})
+ end,
+ {noreply,
+ State#state{waiting_set = gb_trees:insert(From, TRef, WSet)}}
+ end.
+
+call_to_consumer(Method, Args, #state{consumer = Consumer}) ->
+ amqp_gen_consumer:call_consumer(Consumer, Method, Args).
+
+call_to_consumer(Method, Args, DeliveryCtx, #state{consumer = Consumer}) ->
+ amqp_gen_consumer:call_consumer(Consumer, Method, Args, DeliveryCtx).
+
+safe_cancel_timer(undefined) -> ok;
+safe_cancel_timer(TRef) -> erlang:cancel_timer(TRef).
+
+second_to_millisecond(Timeout) ->
+ Timeout * 1000.