summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2012-07-16 11:15:15 +0100
committerTim Watson <tim@rabbitmq.com>2012-07-16 11:15:15 +0100
commit079188669234c2522b61284bee46574410a5f158 (patch)
tree1ce52cc7a711ae0ecfc4b760ded640cef3c99fad
parent82b008482ae58b0f10af773b473f1ebd19e818b5 (diff)
parentb78427551f4a11ee1e83062711629ced04afdfbc (diff)
downloadrabbitmq-server-079188669234c2522b61284bee46574410a5f158.tar.gz
merge default
-rw-r--r--Makefile2
-rwxr-xr-xpackaging/macports/make-port-diff.sh6
-rw-r--r--src/file_handle_cache.erl10
-rw-r--r--src/rabbit_channel.erl52
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl13
-rw-r--r--src/rabbit_mirror_queue_slave.erl13
-rw-r--r--src/rabbit_misc.erl7
-rw-r--r--src/rabbit_nodes.erl4
-rw-r--r--src/rabbit_prelaunch.erl6
-rw-r--r--src/rabbit_reader.erl135
10 files changed, 124 insertions, 124 deletions
diff --git a/Makefile b/Makefile
index 64109312..7c2c8c46 100644
--- a/Makefile
+++ b/Makefile
@@ -216,7 +216,7 @@ run-qc: all
start-background-node: all
-rm -f $(RABBITMQ_MNESIA_DIR).pid
mkdir -p $(RABBITMQ_MNESIA_DIR)
- setsid sh -c "$(MAKE) run-background-node > $(RABBITMQ_MNESIA_DIR)/startup_log 2> $(RABBITMQ_MNESIA_DIR)/startup_err" &
+ nohup sh -c "$(MAKE) run-background-node > $(RABBITMQ_MNESIA_DIR)/startup_log 2> $(RABBITMQ_MNESIA_DIR)/startup_err" > /dev/null &
./scripts/rabbitmqctl -n $(RABBITMQ_NODENAME) wait $(RABBITMQ_MNESIA_DIR).pid kernel
start-rabbit-on-node: all
diff --git a/packaging/macports/make-port-diff.sh b/packaging/macports/make-port-diff.sh
index 3eb1b9f5..ac3afa4e 100755
--- a/packaging/macports/make-port-diff.sh
+++ b/packaging/macports/make-port-diff.sh
@@ -14,8 +14,10 @@ mkdir -p $dir/macports $dir/rabbitmq
cd $dir/macports
svn checkout http://svn.macports.org/repository/macports/trunk/dports/net/rabbitmq-server/ 2>&1 >/dev/null
-# Clear out the svn $id tag
-sed -i -e 's|^# \$.*$|# $Id$|' rabbitmq-server/Portfile
+# Clear out the svn $id tag from the Portfile (and avoid using -i)
+portfile=rabbitmq-server/Portfile
+sed -e 's|^# \$.*$|# $Id$|' ${portfile} > ${portfile}.new
+mv ${portfile}.new ${portfile}
# Get the files from the rabbitmq.com macports repo
cd ../rabbitmq
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index f3b4dbaf..13ee4249 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -374,11 +374,11 @@ sync(Ref) ->
end).
needs_sync(Ref) ->
- with_handles(
- [Ref],
- fun ([#handle { is_dirty = false, write_buffer = [] }]) -> false;
- ([_Handle]) -> true
- end).
+ %% This must *not* use with_handles/2; see bug 25052
+ case get({Ref, fhc_handle}) of
+ #handle { is_dirty = false, write_buffer = [] } -> false;
+ #handle {} -> true
+ end.
position(Ref, NewOffset) ->
with_flushed_handles(
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 22c6a223..864e100a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -267,7 +267,7 @@ handle_cast({method, Method, Content, Flow},
catch
exit:Reason = #amqp_error{} ->
MethodName = rabbit_misc:method_record_type(Method),
- send_exception(Reason#amqp_error{method = MethodName}, State);
+ handle_exception(Reason#amqp_error{method = MethodName}, State);
_:Reason ->
{stop, {Reason, erlang:get_stacktrace()}, State}
end;
@@ -400,11 +400,11 @@ return_ok(State, false, Msg) -> {reply, Msg, State}.
ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
-send_exception(Reason, State = #ch{protocol = Protocol,
- channel = Channel,
- writer_pid = WriterPid,
- reader_pid = ReaderPid,
- conn_pid = ConnPid}) ->
+handle_exception(Reason, State = #ch{protocol = Protocol,
+ channel = Channel,
+ writer_pid = WriterPid,
+ reader_pid = ReaderPid,
+ conn_pid = ConnPid}) ->
{CloseChannel, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
@@ -418,6 +418,11 @@ send_exception(Reason, State = #ch{protocol = Protocol,
{stop, normal, State1}
end.
+precondition_failed(Format) -> precondition_failed(Format, []).
+
+precondition_failed(Format, Params) ->
+ rabbit_misc:protocol_error(precondition_failed, Format, Params).
+
return_queue_declare_ok(#resource{name = ActualName},
NoWait, MessageCount, ConsumerCount, State) ->
return_ok(State#ch{most_recently_declared_queue = ActualName}, NoWait,
@@ -461,9 +466,9 @@ check_user_id_header(#'P_basic'{user_id = Username},
ok;
check_user_id_header(#'P_basic'{user_id = Claimed},
#ch{user = #user{username = Actual}}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "user_id property set to '~s' but "
- "authenticated user was '~s'", [Claimed, Actual]).
+ precondition_failed(
+ "user_id property set to '~s' but authenticated user was '~s'",
+ [Claimed, Actual]).
check_internal_exchange(#exchange{name = Name, internal = true}) ->
rabbit_misc:protocol_error(access_refused,
@@ -625,8 +630,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
State1#ch{uncommitted_message_q = NewTMQ}
end};
{error, Reason} ->
- rabbit_misc:protocol_error(precondition_failed,
- "invalid message: ~p", [Reason])
+ precondition_failed("invalid message: ~p", [Reason])
end;
handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
@@ -881,8 +885,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
{error, not_found} ->
rabbit_misc:not_found(ExchangeName);
{error, in_use} ->
- rabbit_misc:protocol_error(
- precondition_failed, "~s in use", [rabbit_misc:rs(ExchangeName)]);
+ precondition_failed("~s in use", [rabbit_misc:rs(ExchangeName)]);
ok ->
return_ok(State, NoWait, #'exchange.delete_ok'{})
end;
@@ -980,11 +983,9 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
- rabbit_misc:protocol_error(
- precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]);
+ precondition_failed("~s in use", [rabbit_misc:rs(QueueName)]);
{error, not_empty} ->
- rabbit_misc:protocol_error(
- precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]);
+ precondition_failed("~s not empty", [rabbit_misc:rs(QueueName)]);
{ok, PurgedMessageCount} ->
return_ok(State, NoWait,
#'queue.delete_ok'{message_count = PurgedMessageCount})
@@ -1019,15 +1020,13 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
#'queue.purge_ok'{message_count = PurgedMessageCount});
handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "cannot switch from confirm to tx mode", []);
+ precondition_failed("cannot switch from confirm to tx mode");
handle_method(#'tx.select'{}, _, State) ->
{reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}};
handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "channel is not transactional", []);
+ precondition_failed("channel is not transactional");
handle_method(#'tx.commit'{}, _,
State = #ch{uncommitted_message_q = TMQ,
@@ -1041,8 +1040,7 @@ handle_method(#'tx.commit'{}, _,
{noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))};
handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "channel is not transactional", []);
+ precondition_failed("channel is not transactional");
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
uncommitted_acks = TAL,
@@ -1052,8 +1050,7 @@ handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
{reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})};
handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
- rabbit_misc:protocol_error(
- precondition_failed, "cannot switch from tx to confirm mode", []);
+ precondition_failed("cannot switch from tx to confirm mode");
handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
return_ok(State#ch{confirm_enabled = true},
@@ -1263,8 +1260,7 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
QTail, DeliveryTag, Multiple)
end;
{empty, _} ->
- rabbit_misc:protocol_error(
- precondition_failed, "unknown delivery tag ~w", [DeliveryTag])
+ precondition_failed("unknown delivery tag ~w", [DeliveryTag])
end.
ack(Acked, State) ->
@@ -1423,7 +1419,7 @@ complete_tx(State = #ch{tx_status = committing}) ->
ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}),
State#ch{tx_status = in_progress};
complete_tx(State = #ch{tx_status = failed}) ->
- {noreply, State1} = send_exception(
+ {noreply, State1} = handle_exception(
rabbit_misc:amqp_error(
precondition_failed, "partial tx completion", [],
'tx.commit'),
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 3e058793..10debb0b 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -36,8 +36,6 @@
length_fun
}).
--define(ONE_SECOND, 1000).
-
-ifdef(use_specs).
-spec(start_link/4 :: (rabbit_types:amqqueue(), pid() | 'undefined',
@@ -325,7 +323,6 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) ->
true = link(GM),
GM
end,
- ensure_gm_heartbeat(),
{ok, #state { q = Q,
gm = GM1,
monitors = pmon:new(),
@@ -359,11 +356,6 @@ handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
handle_cast({delete_and_terminate, Reason}, State) ->
{stop, Reason, State}.
-handle_info(send_gm_heartbeat, State = #state { gm = GM }) ->
- gm:broadcast(GM, heartbeat),
- ensure_gm_heartbeat(),
- noreply(State);
-
handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
State = #state { monitors = Mons,
death_fun = DeathFun }) ->
@@ -399,7 +391,7 @@ members_changed([_CPid], _Births, []) ->
members_changed([CPid], _Births, Deaths) ->
ok = gen_server2:cast(CPid, {gm_deaths, Deaths}).
-handle_msg([_CPid], _From, heartbeat) ->
+handle_msg([_CPid], _From, master_changed) ->
ok;
handle_msg([CPid], _From, request_length = Msg) ->
ok = gen_server2:cast(CPid, Msg);
@@ -420,6 +412,3 @@ noreply(State) ->
reply(Reply, State) ->
{reply, Reply, State, hibernate}.
-
-ensure_gm_heartbeat() ->
- erlang:send_after(?ONE_SECOND, self(), send_gm_heartbeat).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 03fafc3e..60d3e027 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -199,7 +199,12 @@ handle_call({gm_deaths, Deaths}, From,
%% master has changed to not us.
gen_server2:reply(From, ok),
erlang:monitor(process, Pid),
- ok = gm:broadcast(GM, heartbeat),
+ %% GM is lazy. So we know of the death of the
+ %% slave since it is a neighbour of ours, but
+ %% until a message is sent, not all members will
+ %% know. That might include the new master. So
+ %% broadcast a no-op message to wake everyone up.
+ ok = gm:broadcast(GM, master_changed),
noreply(State #state { master_pid = Pid })
end
end;
@@ -341,7 +346,7 @@ members_changed([_SPid], _Births, []) ->
members_changed([SPid], _Births, Deaths) ->
inform_deaths(SPid, Deaths).
-handle_msg([_SPid], _From, heartbeat) ->
+handle_msg([_SPid], _From, master_changed) ->
ok;
handle_msg([_SPid], _From, request_length) ->
%% This is only of value to the master
@@ -452,7 +457,9 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
rabbit_mirror_queue_master:length_fun()),
true = unlink(GM),
gen_server2:reply(From, {promote, CPid}),
- ok = gm:confirmed_broadcast(GM, heartbeat),
+ %% TODO this has been in here since the beginning, but it's not
+ %% obvious if it is needed. Investigate...
+ ok = gm:confirmed_broadcast(GM, master_changed),
%% Everything that we're monitoring, we need to ensure our new
%% coordinator is monitoring.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index d41aa09b..1fefa688 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -36,7 +36,7 @@
-export([execute_mnesia_transaction/2]).
-export([execute_mnesia_tx_with_tail/1]).
-export([ensure_ok/2]).
--export([tcp_name/3]).
+-export([tcp_name/3, format_inet_error/1]).
-export([upmap/2, map_in_order/2]).
-export([table_filter/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
@@ -152,6 +152,7 @@
-spec(tcp_name/3 ::
(atom(), inet:ip_address(), rabbit_networking:ip_port())
-> atom()).
+-spec(format_inet_error/1 :: (atom()) -> string()).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(table_filter/3:: (fun ((A) -> boolean()), fun ((A, boolean()) -> 'ok'),
@@ -510,6 +511,10 @@ tcp_name(Prefix, IPAddress, Port)
list_to_atom(
format("~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port])).
+format_inet_error(address) -> "cannot connect to host/port";
+format_inet_error(timeout) -> "timed out";
+format_inet_error(Error) -> inet:format_error(Error).
+
%% This is a modified version of Luke Gorrie's pmap -
%% http://lukego.livejournal.com/6753.html - that doesn't care about
%% the order in which results are received.
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index 1c23632d..c8d77b0f 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -70,8 +70,8 @@ diagnostics0() ->
diagnostics_host(Host) ->
case names(Host) of
{error, EpmdReason} ->
- {"- unable to connect to epmd on ~s: ~w",
- [Host, EpmdReason]};
+ {"- unable to connect to epmd on ~s: ~w (~s)",
+ [Host, EpmdReason, rabbit_misc:format_inet_error(EpmdReason)]};
{ok, NamePorts} ->
{"- ~s: ~p",
[Host, [{list_to_atom(Name), Port} ||
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index d56211b5..b0454435 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -67,9 +67,5 @@ duplicate_node_check(NodeStr) ->
{error, EpmdReason} ->
rabbit_misc:quit("epmd error for host ~p: ~p (~s)~n",
[NodeHost, EpmdReason,
- case EpmdReason of
- address -> "unable to establish tcp connection";
- timeout -> "timed out establishing tcp connection";
- _ -> inet:format_error(EpmdReason)
- end])
+ rabbit_misc:format_inet_error(EpmdReason)])
end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index bd5cf588..75246007 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -173,6 +173,8 @@ server_capabilities(rabbit_framing_amqp_0_9_1) ->
server_capabilities(_) ->
[].
+%%--------------------------------------------------------------------------
+
log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
@@ -383,6 +385,9 @@ update_last_blocked_by(State = #v1{conserve_resources = true}) ->
update_last_blocked_by(State = #v1{conserve_resources = false}) ->
State#v1{last_blocked_by = flow}.
+%%--------------------------------------------------------------------------
+%% error handling / termination
+
close_connection(State = #v1{queue_collector = Collector,
connection = #connection{
timeout_sec = TimeoutSec}}) ->
@@ -412,18 +417,6 @@ handle_dependent_exit(ChPid, Reason, State) ->
Channel, Reason))
end.
-channel_cleanup(ChPid) ->
- case get({ch_pid, ChPid}) of
- undefined -> undefined;
- {Channel, MRef} -> credit_flow:peer_down(ChPid),
- erase({channel, Channel}),
- erase({ch_pid, ChPid}),
- erlang:demonitor(MRef, [flush]),
- Channel
- end.
-
-all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
-
terminate_channels() ->
NChannels =
length([rabbit_channel:shutdown(ChPid) || ChPid <- all_channels()]),
@@ -477,6 +470,53 @@ maybe_close(State) ->
termination_kind(normal) -> controlled;
termination_kind(_) -> uncontrolled.
+handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) ->
+ State;
+handle_exception(State, Channel, Reason) ->
+ send_exception(State, Channel, Reason).
+
+send_exception(State = #v1{connection = #connection{protocol = Protocol}},
+ Channel, Reason) ->
+ {0, CloseMethod} =
+ rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
+ terminate_channels(),
+ State1 = close_connection(State),
+ ok = send_on_channel0(State1#v1.sock, CloseMethod, Protocol),
+ State1.
+
+%%--------------------------------------------------------------------------
+
+create_channel(Channel, State) ->
+ #v1{sock = Sock, queue_collector = Collector,
+ channel_sup_sup_pid = ChanSupSup,
+ connection = #connection{protocol = Protocol,
+ frame_max = FrameMax,
+ user = User,
+ vhost = VHost,
+ capabilities = Capabilities}} = State,
+ {ok, _ChSupPid, {ChPid, AState}} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock),
+ Protocol, User, VHost, Capabilities, Collector}),
+ MRef = erlang:monitor(process, ChPid),
+ put({ch_pid, ChPid}, {Channel, MRef}),
+ put({channel, Channel}, {ChPid, AState}),
+ ok.
+
+channel_cleanup(ChPid) ->
+ case get({ch_pid, ChPid}) of
+ undefined -> undefined;
+ {Channel, MRef} -> credit_flow:peer_down(ChPid),
+ erase({channel, Channel}),
+ erase({ch_pid, ChPid}),
+ erlang:demonitor(MRef, [flush]),
+ Channel
+ end.
+
+all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
+
+%%--------------------------------------------------------------------------
+
handle_frame(Type, 0, Payload,
State = #v1{connection_state = CS,
connection = #connection{protocol = Protocol}})
@@ -522,6 +562,17 @@ process_frame(Frame, Channel, State) ->
Channel, State#v1.connection_state, Frame})
end.
+process_channel_frame(Frame, ChPid, AState) ->
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} -> {ok, NewAState};
+ {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
+ {ok, NewAState};
+ {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
+ ChPid, Method, Content),
+ {ok, NewAState};
+ {error, Reason} -> {error, Reason}
+ end.
+
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
channel_cleanup(ChPid),
control_throttle(State);
@@ -536,13 +587,15 @@ post_process_frame({method, MethodName, _}, _ChPid,
post_process_frame(_Frame, _ChPid, State) ->
control_throttle(State).
+%%--------------------------------------------------------------------------
+
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
ensure_stats_timer(
switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
PayloadSize + 1));
-handle_input({frame_payload, Type, Channel, PayloadSize},
- PayloadAndMarker, State) ->
+handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker,
+ State) ->
case PayloadAndMarker of
<<Payload:PayloadSize/binary, ?FRAME_END>> ->
switch_callback(handle_frame(Type, Channel, Payload, State),
@@ -834,8 +887,8 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
SockStat =:= send_oct;
SockStat =:= send_cnt;
SockStat =:= send_pend ->
- socket_info(fun () -> rabbit_net:getstat(Sock, [SockStat]) end,
- fun ([{_, I}]) -> I end);
+ socket_info(fun (S) -> rabbit_net:getstat(S, [SockStat]) end,
+ fun ([{_, I}]) -> I end, Sock);
i(state, #v1{connection_state = S}) ->
S;
i(last_blocked_by, #v1{last_blocked_by = By}) ->
@@ -871,10 +924,7 @@ i(Item, #v1{}) ->
throw({bad_argument, Item}).
socket_info(Get, Select, Sock) ->
- socket_info(fun() -> Get(Sock) end, Select).
-
-socket_info(Get, Select) ->
- case Get() of
+ case Get(Sock) of
{ok, T} -> Select(T);
{error, _} -> ''
end.
@@ -897,51 +947,6 @@ cert_info(F, Sock) ->
{ok, Cert} -> list_to_binary(F(Cert))
end.
-%%--------------------------------------------------------------------------
-
-create_channel(Channel, State) ->
- #v1{sock = Sock, queue_collector = Collector,
- channel_sup_sup_pid = ChanSupSup,
- connection = #connection{protocol = Protocol,
- frame_max = FrameMax,
- user = User,
- vhost = VHost,
- capabilities = Capabilities}} = State,
- {ok, _ChSupPid, {ChPid, AState}} =
- rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), name(Sock),
- Protocol, User, VHost, Capabilities, Collector}),
- MRef = erlang:monitor(process, ChPid),
- put({ch_pid, ChPid}, {Channel, MRef}),
- put({channel, Channel}, {ChPid, AState}),
- ok.
-
-process_channel_frame(Frame, ChPid, AState) ->
- case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} -> {ok, NewAState};
- {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
- {ok, NewAState};
- {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
- ChPid, Method, Content),
- {ok, NewAState};
- {error, Reason} -> {error, Reason}
- end.
-
-handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) ->
- State;
-handle_exception(State, Channel, Reason) ->
- send_exception(State, Channel, Reason).
-
-send_exception(State = #v1{connection = #connection{protocol = Protocol}},
- Channel, Reason) ->
- {0, CloseMethod} =
- rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
- terminate_channels(),
- State1 = close_connection(State),
- ok = rabbit_writer:internal_send_command(
- State1#v1.sock, 0, CloseMethod, Protocol),
- State1.
-
emit_stats(State) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
rabbit_event:reset_stats_timer(State, #v1.stats_timer).