diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2011-08-03 16:10:29 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2011-08-03 16:10:29 +0100 |
commit | b407519e245db1fb0c69c57e86ca7a2dfd534a5c (patch) | |
tree | 37867f142ee681fb8807acbf657f3b4ca83dcf74 | |
parent | 8bbc16019d3def0c4a98135f852e11e31cc4c9e1 (diff) | |
download | rabbitmq-server-b407519e245db1fb0c69c57e86ca7a2dfd534a5c.tar.gz |
Also abstract assemble_frames, duh. Also lose send_close_frame/1, dumb idea.
-rw-r--r-- | src/rabbit_amqp.erl | 1 | ||||
-rw-r--r-- | src/rabbit_amqp_0_x.erl | 21 | ||||
-rw-r--r-- | src/rabbit_channel_sup.erl | 6 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 14 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 64 |
5 files changed, 51 insertions, 55 deletions
diff --git a/src/rabbit_amqp.erl b/src/rabbit_amqp.erl index c510a494..9c748a77 100644 --- a/src/rabbit_amqp.erl +++ b/src/rabbit_amqp.erl @@ -22,7 +22,6 @@ behaviour_info(callbacks) -> [ {accept_handshake_bytes, 1}, {start_connection, 2}, - {send_close_frame, 1}, {handle_input, 3}, {assemble_frame, 3}, {assemble_frames, 5} diff --git a/src/rabbit_amqp_0_x.erl b/src/rabbit_amqp_0_x.erl index a7f5891b..69a855a4 100644 --- a/src/rabbit_amqp_0_x.erl +++ b/src/rabbit_amqp_0_x.erl @@ -18,8 +18,8 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([accept_handshake_bytes/1, start_connection/2, send_close_frame/1, - handle_input/3, assemble_frame/3, assemble_frames/5]). +-export([accept_handshake_bytes/1, start_connection/2, handle_input/3, + assemble_frame/3, assemble_frames/5]). -export([process_channel_frame/5]). %% used by erlang-client TODO @@ -94,9 +94,6 @@ accept_handshake_bytes(<<"AMQP", 1, 1, 9, 1>>) -> accept_handshake_bytes(_) -> false. -send_close_frame(Sock) -> - exit(bang). - handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> rabbit_reader:ensure_stats_timer( rabbit_reader:switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, @@ -275,7 +272,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, stats_timer = StatsTimer}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, - ok = rabbit_reader:send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), + ok = rabbit_reader:send_on_channel0(Sock, #'connection.open_ok'{}, ?MODULE, Protocol), State1 = rabbit_reader:internal_conserve_memory( rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), State#v1{connection_state = running, @@ -296,7 +293,7 @@ handle_method0(#'connection.close'{}, when CS =:= closing; CS =:= closed -> %% We're already closed or closing, so we don't need to cleanup %% anything. - ok = rabbit_reader:send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), + ok = rabbit_reader:send_on_channel0(Sock, #'connection.close_ok'{}, ?MODULE, Protocol), State; handle_method0(#'connection.close_ok'{}, State = #v1{connection_state = closed}) -> @@ -319,8 +316,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> capabilities = Capabilities}} = State, {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( - ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User, - VHost, Capabilities, Collector}), + ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), ?MODULE, Protocol, + User, VHost, Capabilities, Collector}), MRef = erlang:monitor(process, ChPid), NewAState = process_channel_frame(AnalyzedFrame, self(), Channel, ChPid, AState), @@ -384,13 +381,13 @@ auth_phase(Response, rabbit_misc:protocol_error(syntax_error, Msg, Args); {challenge, Challenge, AuthState1} -> Secure = #'connection.secure'{challenge = Challenge}, - ok = rabbit_reader:send_on_channel0(Sock, Secure, Protocol), + ok = rabbit_reader:send_on_channel0(Sock, Secure, ?MODULE, Protocol), State#v1{auth_state = AuthState1}; {ok, User} -> Tune = #'connection.tune'{channel_max = 0, frame_max = rabbit_reader:server_frame_max(), heartbeat = 0}, - ok = rabbit_reader:send_on_channel0(Sock, Tune, Protocol), + ok = rabbit_reader:send_on_channel0(Sock, Tune, ?MODULE, Protocol), State#v1{connection_state = tuning, connection = Connection#connection{user = User}} end. @@ -408,7 +405,7 @@ start_connection(<<"AMQP", 0, 0, 9, 1>>, server_properties = server_properties(Protocol), mechanisms = auth_mechanisms_binary(Sock), locales = <<"en_US">> }, - ok = rabbit_reader:send_on_channel0(Sock, Start, Protocol), + ok = rabbit_reader:send_on_channel0(Sock, Start, ?MODULE, Protocol), rabbit_reader:switch_callback(State#v1{connection = Connection#connection{ timeout_sec = ?NORMAL_TIMEOUT, protocol = Protocol}, diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 65ccca02..68f69e6e 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -45,14 +45,14 @@ %%---------------------------------------------------------------------------- -start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost, - Capabilities, Collector}) -> +start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Module, Protocol, + User, VHost, Capabilities, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, WriterPid} = supervisor2:start_child( SupPid, {writer, {rabbit_writer, start_link, - [Sock, Channel, FrameMax, Protocol, ReaderPid]}, + [Sock, Channel, FrameMax, Module, Protocol, ReaderPid]}, intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}), {ok, ChannelPid} = supervisor2:start_child( diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 707eb358..569b930f 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -28,7 +28,7 @@ -export([all_channels/0, channel_cleanup/1, emit_stats/1, infos/2, internal_conserve_memory/2, maybe_close/1, send_exception/3, - send_on_channel0/3, switch_callback/3]). + send_on_channel0/4, switch_callback/3]). -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). @@ -400,12 +400,13 @@ wait_for_channel_termination(N, TimerRef) -> end. maybe_close(State = #v1{connection_state = closing, + module = Module, connection = #connection{protocol = Protocol}, sock = Sock}) -> case all_channels() of [] -> NewState = close_connection(State), - ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), + ok = send_on_channel0(Sock, #'connection.close_ok'{}, Module, Protocol), NewState; _ -> State end; @@ -454,8 +455,8 @@ server_frame_max() -> {ok, FrameMax} = application:get_env(rabbit, frame_max), FrameMax. -send_on_channel0(Sock, Method, Protocol) -> - ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). +send_on_channel0(Sock, Method, Module, Protocol) -> + ok = rabbit_writer:internal_send_command(Sock, 0, Method, Module, Protocol). %%-------------------------------------------------------------------------- @@ -556,14 +557,15 @@ handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) -> handle_exception(State, Channel, Reason) -> send_exception(State, Channel, Reason). -send_exception(State = #v1{connection = #connection{protocol = Protocol}}, +send_exception(State = #v1{connection = #connection{protocol = Protocol}, + module = Module}, Channel, Reason) -> {0, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), terminate_channels(), State1 = close_connection(State), ok = rabbit_writer:internal_send_command( - State1#v1.sock, 0, CloseMethod, Protocol), + State1#v1.sock, 0, CloseMethod, Module, Protocol), State1. emit_stats(State = #v1{stats_timer = StatsTimer}) -> diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index ac3434d2..d7c84eee 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -18,13 +18,13 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/5, start_link/5, mainloop/2, mainloop1/2]). +-export([start/6, start_link/6, mainloop/2, mainloop1/2]). -export([send_command/2, send_command/3, send_command_sync/2, send_command_sync/3, send_command_and_notify/4, send_command_and_notify/5]). --export([internal_send_command/4, internal_send_command/6]). +-export([internal_send_command/5, internal_send_command/7]). --record(wstate, {sock, channel, frame_max, protocol}). +-record(wstate, {sock, channel, frame_max, module, protocol}). -define(HIBERNATE_AFTER, 5000). @@ -32,13 +32,13 @@ -ifdef(use_specs). --spec(start/5 :: +-spec(start/6 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid()) + non_neg_integer(), atom(), rabbit_types:protocol(), pid()) -> rabbit_types:ok(pid())). --spec(start_link/5 :: +-spec(start_link/6 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), rabbit_types:protocol(), pid()) + non_neg_integer(), atom(), rabbit_types:protocol(), pid()) -> rabbit_types:ok(pid())). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). @@ -57,34 +57,36 @@ (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). --spec(internal_send_command/4 :: +-spec(internal_send_command/5 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - rabbit_framing:amqp_method_record(), rabbit_types:protocol()) + rabbit_framing:amqp_method_record(), atom(), rabbit_types:protocol()) -> 'ok'). --spec(internal_send_command/6 :: +-spec(internal_send_command/7 :: (rabbit_net:socket(), rabbit_channel:channel_number(), rabbit_framing:amqp_method_record(), rabbit_types:content(), - non_neg_integer(), rabbit_types:protocol()) + non_neg_integer(), atom(), rabbit_types:protocol()) -> 'ok'). -endif. %%--------------------------------------------------------------------------- -start(Sock, Channel, FrameMax, Protocol, ReaderPid) -> +start(Sock, Channel, FrameMax, Module, Protocol, ReaderPid) -> {ok, proc_lib:spawn(?MODULE, mainloop, [ReaderPid, #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, + module = Module, protocol = Protocol}])}. -start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> +start_link(Sock, Channel, FrameMax, Module, Protocol, ReaderPid) -> {ok, proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid, #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, + module = Module, protocol = Protocol}])}. mainloop(ReaderPid, State) -> @@ -165,20 +167,12 @@ call(Pid, Msg) -> %%--------------------------------------------------------------------------- -assemble_frame(Channel, MethodRecord, Protocol) -> - ?LOGMESSAGE(out, Channel, MethodRecord, none), - rabbit_binary_generator:build_simple_method_frame( - Channel, MethodRecord, Protocol). - -assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> - ?LOGMESSAGE(out, Channel, MethodRecord, Content), - MethodName = rabbit_misc:method_record_type(MethodRecord), - true = Protocol:method_has_content(MethodName), % assertion - MethodFrame = rabbit_binary_generator:build_simple_method_frame( - Channel, MethodRecord, Protocol), - ContentFrames = rabbit_binary_generator:build_simple_content_frames( - Channel, Content, FrameMax, Protocol), - [MethodFrame | ContentFrames]. +assemble_frame(Channel, MethodRecord, Module, Protocol) -> + Module:assemble_frame(Channel, MethodRecord, Protocol). + + +assemble_frames(Channel, MethodRecord, Content, FrameMax, Module, Protocol) -> + Module:assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol). %% We optimise delivery of small messages. Content-bearing methods %% require at least three frames. Small messages always fit into @@ -200,14 +194,15 @@ tcp_send(Sock, Data) -> rabbit_misc:throw_on_error(inet_error, fun () -> rabbit_net:send(Sock, Data) end). -internal_send_command(Sock, Channel, MethodRecord, Protocol) -> - ok = tcp_send(Sock, assemble_frame(Channel, MethodRecord, Protocol)). +internal_send_command(Sock, Channel, MethodRecord, Module, Protocol) -> + ok = tcp_send(Sock, assemble_frame(Channel, MethodRecord, + Module, Protocol)). internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, - Protocol) -> + Module, Protocol) -> ok = send_frames(fun tcp_send/2, Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax, Protocol)). + Content, FrameMax, Module, Protocol)). %% gen_tcp:send/2 does a selective receive of {inet_reply, Sock, %% Status} to obtain the result. That is bad when it is called from @@ -230,17 +225,20 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, internal_send_command_async(MethodRecord, #wstate{sock = Sock, channel = Channel, + module = Module, protocol = Protocol}) -> - ok = port_cmd(Sock, assemble_frame(Channel, MethodRecord, Protocol)). + ok = port_cmd(Sock, assemble_frame(Channel, MethodRecord, + Module, Protocol)). internal_send_command_async(MethodRecord, Content, #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, + module = Module, protocol = Protocol}) -> ok = send_frames(fun port_cmd/2, Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax, Protocol)). + Content, FrameMax, Module, Protocol)). port_cmd(Sock, Data) -> true = try rabbit_net:port_command(Sock, Data) |