summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-08-03 16:10:29 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-08-03 16:10:29 +0100
commitb407519e245db1fb0c69c57e86ca7a2dfd534a5c (patch)
tree37867f142ee681fb8807acbf657f3b4ca83dcf74
parent8bbc16019d3def0c4a98135f852e11e31cc4c9e1 (diff)
downloadrabbitmq-server-b407519e245db1fb0c69c57e86ca7a2dfd534a5c.tar.gz
Also abstract assemble_frames, duh. Also lose send_close_frame/1, dumb idea.
-rw-r--r--src/rabbit_amqp.erl1
-rw-r--r--src/rabbit_amqp_0_x.erl21
-rw-r--r--src/rabbit_channel_sup.erl6
-rw-r--r--src/rabbit_reader.erl14
-rw-r--r--src/rabbit_writer.erl64
5 files changed, 51 insertions, 55 deletions
diff --git a/src/rabbit_amqp.erl b/src/rabbit_amqp.erl
index c510a494..9c748a77 100644
--- a/src/rabbit_amqp.erl
+++ b/src/rabbit_amqp.erl
@@ -22,7 +22,6 @@ behaviour_info(callbacks) ->
[
{accept_handshake_bytes, 1},
{start_connection, 2},
- {send_close_frame, 1},
{handle_input, 3},
{assemble_frame, 3},
{assemble_frames, 5}
diff --git a/src/rabbit_amqp_0_x.erl b/src/rabbit_amqp_0_x.erl
index a7f5891b..69a855a4 100644
--- a/src/rabbit_amqp_0_x.erl
+++ b/src/rabbit_amqp_0_x.erl
@@ -18,8 +18,8 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([accept_handshake_bytes/1, start_connection/2, send_close_frame/1,
- handle_input/3, assemble_frame/3, assemble_frames/5]).
+-export([accept_handshake_bytes/1, start_connection/2, handle_input/3,
+ assemble_frame/3, assemble_frames/5]).
-export([process_channel_frame/5]). %% used by erlang-client TODO
@@ -94,9 +94,6 @@ accept_handshake_bytes(<<"AMQP", 1, 1, 9, 1>>) ->
accept_handshake_bytes(_) ->
false.
-send_close_frame(Sock) ->
- exit(bang).
-
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
rabbit_reader:ensure_stats_timer(
rabbit_reader:switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
@@ -275,7 +272,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
stats_timer = StatsTimer}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
- ok = rabbit_reader:send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
+ ok = rabbit_reader:send_on_channel0(Sock, #'connection.open_ok'{}, ?MODULE, Protocol),
State1 = rabbit_reader:internal_conserve_memory(
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
State#v1{connection_state = running,
@@ -296,7 +293,7 @@ handle_method0(#'connection.close'{},
when CS =:= closing; CS =:= closed ->
%% We're already closed or closing, so we don't need to cleanup
%% anything.
- ok = rabbit_reader:send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
+ ok = rabbit_reader:send_on_channel0(Sock, #'connection.close_ok'{}, ?MODULE, Protocol),
State;
handle_method0(#'connection.close_ok'{},
State = #v1{connection_state = closed}) ->
@@ -319,8 +316,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
capabilities = Capabilities}} = State,
{ok, _ChSupPid, {ChPid, AState}} =
rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User,
- VHost, Capabilities, Collector}),
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), ?MODULE, Protocol,
+ User, VHost, Capabilities, Collector}),
MRef = erlang:monitor(process, ChPid),
NewAState = process_channel_frame(AnalyzedFrame, self(),
Channel, ChPid, AState),
@@ -384,13 +381,13 @@ auth_phase(Response,
rabbit_misc:protocol_error(syntax_error, Msg, Args);
{challenge, Challenge, AuthState1} ->
Secure = #'connection.secure'{challenge = Challenge},
- ok = rabbit_reader:send_on_channel0(Sock, Secure, Protocol),
+ ok = rabbit_reader:send_on_channel0(Sock, Secure, ?MODULE, Protocol),
State#v1{auth_state = AuthState1};
{ok, User} ->
Tune = #'connection.tune'{channel_max = 0,
frame_max = rabbit_reader:server_frame_max(),
heartbeat = 0},
- ok = rabbit_reader:send_on_channel0(Sock, Tune, Protocol),
+ ok = rabbit_reader:send_on_channel0(Sock, Tune, ?MODULE, Protocol),
State#v1{connection_state = tuning,
connection = Connection#connection{user = User}}
end.
@@ -408,7 +405,7 @@ start_connection(<<"AMQP", 0, 0, 9, 1>>,
server_properties = server_properties(Protocol),
mechanisms = auth_mechanisms_binary(Sock),
locales = <<"en_US">> },
- ok = rabbit_reader:send_on_channel0(Sock, Start, Protocol),
+ ok = rabbit_reader:send_on_channel0(Sock, Start, ?MODULE, Protocol),
rabbit_reader:switch_callback(State#v1{connection = Connection#connection{
timeout_sec = ?NORMAL_TIMEOUT,
protocol = Protocol},
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index 65ccca02..68f69e6e 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -45,14 +45,14 @@
%%----------------------------------------------------------------------------
-start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost,
- Capabilities, Collector}) ->
+start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Module, Protocol,
+ User, VHost, Capabilities, Collector}) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, WriterPid} =
supervisor2:start_child(
SupPid,
{writer, {rabbit_writer, start_link,
- [Sock, Channel, FrameMax, Protocol, ReaderPid]},
+ [Sock, Channel, FrameMax, Module, Protocol, ReaderPid]},
intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}),
{ok, ChannelPid} =
supervisor2:start_child(
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 707eb358..569b930f 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -28,7 +28,7 @@
-export([all_channels/0, channel_cleanup/1, emit_stats/1, infos/2,
internal_conserve_memory/2, maybe_close/1, send_exception/3,
- send_on_channel0/3, switch_callback/3]).
+ send_on_channel0/4, switch_callback/3]).
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
@@ -400,12 +400,13 @@ wait_for_channel_termination(N, TimerRef) ->
end.
maybe_close(State = #v1{connection_state = closing,
+ module = Module,
connection = #connection{protocol = Protocol},
sock = Sock}) ->
case all_channels() of
[] ->
NewState = close_connection(State),
- ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}, Module, Protocol),
NewState;
_ -> State
end;
@@ -454,8 +455,8 @@ server_frame_max() ->
{ok, FrameMax} = application:get_env(rabbit, frame_max),
FrameMax.
-send_on_channel0(Sock, Method, Protocol) ->
- ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
+send_on_channel0(Sock, Method, Module, Protocol) ->
+ ok = rabbit_writer:internal_send_command(Sock, 0, Method, Module, Protocol).
%%--------------------------------------------------------------------------
@@ -556,14 +557,15 @@ handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) ->
handle_exception(State, Channel, Reason) ->
send_exception(State, Channel, Reason).
-send_exception(State = #v1{connection = #connection{protocol = Protocol}},
+send_exception(State = #v1{connection = #connection{protocol = Protocol},
+ module = Module},
Channel, Reason) ->
{0, CloseMethod} =
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
terminate_channels(),
State1 = close_connection(State),
ok = rabbit_writer:internal_send_command(
- State1#v1.sock, 0, CloseMethod, Protocol),
+ State1#v1.sock, 0, CloseMethod, Module, Protocol),
State1.
emit_stats(State = #v1{stats_timer = StatsTimer}) ->
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index ac3434d2..d7c84eee 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -18,13 +18,13 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/5, start_link/5, mainloop/2, mainloop1/2]).
+-export([start/6, start_link/6, mainloop/2, mainloop1/2]).
-export([send_command/2, send_command/3,
send_command_sync/2, send_command_sync/3,
send_command_and_notify/4, send_command_and_notify/5]).
--export([internal_send_command/4, internal_send_command/6]).
+-export([internal_send_command/5, internal_send_command/7]).
--record(wstate, {sock, channel, frame_max, protocol}).
+-record(wstate, {sock, channel, frame_max, module, protocol}).
-define(HIBERNATE_AFTER, 5000).
@@ -32,13 +32,13 @@
-ifdef(use_specs).
--spec(start/5 ::
+-spec(start/6 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol(), pid())
+ non_neg_integer(), atom(), rabbit_types:protocol(), pid())
-> rabbit_types:ok(pid())).
--spec(start_link/5 ::
+-spec(start_link/6 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol(), pid())
+ non_neg_integer(), atom(), rabbit_types:protocol(), pid())
-> rabbit_types:ok(pid())).
-spec(send_command/2 ::
(pid(), rabbit_framing:amqp_method_record()) -> 'ok').
@@ -57,34 +57,36 @@
(pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
rabbit_types:content())
-> 'ok').
--spec(internal_send_command/4 ::
+-spec(internal_send_command/5 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- rabbit_framing:amqp_method_record(), rabbit_types:protocol())
+ rabbit_framing:amqp_method_record(), atom(), rabbit_types:protocol())
-> 'ok').
--spec(internal_send_command/6 ::
+-spec(internal_send_command/7 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
rabbit_framing:amqp_method_record(), rabbit_types:content(),
- non_neg_integer(), rabbit_types:protocol())
+ non_neg_integer(), atom(), rabbit_types:protocol())
-> 'ok').
-endif.
%%---------------------------------------------------------------------------
-start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
+start(Sock, Channel, FrameMax, Module, Protocol, ReaderPid) ->
{ok,
proc_lib:spawn(?MODULE, mainloop, [ReaderPid,
#wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
+ module = Module,
protocol = Protocol}])}.
-start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
+start_link(Sock, Channel, FrameMax, Module, Protocol, ReaderPid) ->
{ok,
proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid,
#wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
+ module = Module,
protocol = Protocol}])}.
mainloop(ReaderPid, State) ->
@@ -165,20 +167,12 @@ call(Pid, Msg) ->
%%---------------------------------------------------------------------------
-assemble_frame(Channel, MethodRecord, Protocol) ->
- ?LOGMESSAGE(out, Channel, MethodRecord, none),
- rabbit_binary_generator:build_simple_method_frame(
- Channel, MethodRecord, Protocol).
-
-assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
- ?LOGMESSAGE(out, Channel, MethodRecord, Content),
- MethodName = rabbit_misc:method_record_type(MethodRecord),
- true = Protocol:method_has_content(MethodName), % assertion
- MethodFrame = rabbit_binary_generator:build_simple_method_frame(
- Channel, MethodRecord, Protocol),
- ContentFrames = rabbit_binary_generator:build_simple_content_frames(
- Channel, Content, FrameMax, Protocol),
- [MethodFrame | ContentFrames].
+assemble_frame(Channel, MethodRecord, Module, Protocol) ->
+ Module:assemble_frame(Channel, MethodRecord, Protocol).
+
+
+assemble_frames(Channel, MethodRecord, Content, FrameMax, Module, Protocol) ->
+ Module:assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol).
%% We optimise delivery of small messages. Content-bearing methods
%% require at least three frames. Small messages always fit into
@@ -200,14 +194,15 @@ tcp_send(Sock, Data) ->
rabbit_misc:throw_on_error(inet_error,
fun () -> rabbit_net:send(Sock, Data) end).
-internal_send_command(Sock, Channel, MethodRecord, Protocol) ->
- ok = tcp_send(Sock, assemble_frame(Channel, MethodRecord, Protocol)).
+internal_send_command(Sock, Channel, MethodRecord, Module, Protocol) ->
+ ok = tcp_send(Sock, assemble_frame(Channel, MethodRecord,
+ Module, Protocol)).
internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax,
- Protocol) ->
+ Module, Protocol) ->
ok = send_frames(fun tcp_send/2, Sock,
assemble_frames(Channel, MethodRecord,
- Content, FrameMax, Protocol)).
+ Content, FrameMax, Module, Protocol)).
%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
%% Status} to obtain the result. That is bad when it is called from
@@ -230,17 +225,20 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax,
internal_send_command_async(MethodRecord,
#wstate{sock = Sock,
channel = Channel,
+ module = Module,
protocol = Protocol}) ->
- ok = port_cmd(Sock, assemble_frame(Channel, MethodRecord, Protocol)).
+ ok = port_cmd(Sock, assemble_frame(Channel, MethodRecord,
+ Module, Protocol)).
internal_send_command_async(MethodRecord, Content,
#wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
+ module = Module,
protocol = Protocol}) ->
ok = send_frames(fun port_cmd/2, Sock,
assemble_frames(Channel, MethodRecord,
- Content, FrameMax, Protocol)).
+ Content, FrameMax, Module, Protocol)).
port_cmd(Sock, Data) ->
true = try rabbit_net:port_command(Sock, Data)