summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-13 14:54:20 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-13 14:54:20 +0100
commit171f2949e080cb910a5681f5578ea562882a3ae1 (patch)
treeacd2851e8e01e21fdb827550253c15940c8d8dcc
parent80da240c80422a618795cde171fff05b8ddfd39e (diff)
downloadrabbitmq-server-171f2949e080cb910a5681f5578ea562882a3ae1.tar.gz
Remove writer flush and improve sync methods
-rw-r--r--src/rabbit_channel.erl3
-rw-r--r--src/rabbit_writer.erl46
2 files changed, 21 insertions, 28 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 493b1542..835d3f0d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -403,8 +403,7 @@ handle_method(_Method, _, #ch{state = starting}) ->
handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
ok = rollback_and_notify(State),
- ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
- ok = rabbit_writer:flush(WriterPid),
+ ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}),
stop;
handle_method(#'access.request'{},_, State) ->
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 7c2da24f..9242593f 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -33,9 +33,9 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/5, start_link/5, flush/1, mainloop/2, mainloop1/2]).
--export([send_command/2, send_command/3, send_command_and_signal_back/3,
- send_command_and_signal_back/4, send_command_and_notify/5]).
+-export([start/5, start_link/5, mainloop/2, mainloop1/2]).
+-export([send_command/2, send_command/3, send_command_sync/2,
+ send_command_sync/3, send_command_and_notify/5]).
-export([internal_send_command/4, internal_send_command/6]).
-import(gen_tcp).
@@ -56,17 +56,15 @@
(rabbit_net:socket(), rabbit_channel:channel_number(),
non_neg_integer(), rabbit_types:protocol(), pid())
-> rabbit_types:ok(pid())).
--spec(flush/1 :: (pid()) -> 'ok').
-spec(send_command/2 ::
(pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(send_command/3 ::
(pid(), rabbit_framing:amqp_method_record(), rabbit_types:content())
-> 'ok').
--spec(send_command_and_signal_back/3 ::
- (pid(), rabbit_framing:amqp_method(), pid()) -> 'ok').
--spec(send_command_and_signal_back/4 ::
- (pid(), rabbit_framing:amqp_method(), rabbit_types:content(), pid())
- -> 'ok').
+-spec(send_command_sync/2 ::
+ (pid(), rabbit_framing:amqp_method()) -> 'ok').
+-spec(send_command_sync/3 ::
+ (pid(), rabbit_framing:amqp_method(), rabbit_types:content()) -> 'ok').
-spec(send_command_and_notify/5 ::
(pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
rabbit_types:content())
@@ -129,20 +127,20 @@ handle_message({send_command, MethodRecord, Content},
ok = internal_send_command_async(Sock, Channel, MethodRecord,
Content, FrameMax, Protocol),
State;
-handle_message({send_command_and_signal_back, MethodRecord, Parent},
+handle_message({'$writer_call', From, MethodRecord},
State = #wstate{sock = Sock, channel = Channel,
protocol = Protocol}) ->
ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol),
- Parent ! rabbit_writer_send_command_signal,
+ gen_server:reply(From, ok),
State;
-handle_message({send_command_and_signal_back, MethodRecord, Content, Parent},
+handle_message({'$writer_call', From, {MethodRecord, Content}},
State = #wstate{sock = Sock,
channel = Channel,
frame_max = FrameMax,
protocol = Protocol}) ->
ok = internal_send_command_async(Sock, Channel, MethodRecord,
Content, FrameMax, Protocol),
- Parent ! rabbit_writer_send_command_signal,
+ gen_server:reply(From, ok),
State;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
State = #wstate{sock = Sock,
@@ -157,9 +155,6 @@ handle_message({inet_reply, _, ok}, State) ->
State;
handle_message({inet_reply, _, Status}, _State) ->
exit({writer, send_failed, Status});
-handle_message({flush, Pid, Ref}, State) ->
- Pid ! Ref,
- State;
handle_message(Message, _State) ->
exit({writer, message_not_understood, Message}).
@@ -173,22 +168,21 @@ send_command(W, MethodRecord, Content) ->
W ! {send_command, MethodRecord, Content},
ok.
-send_command_and_signal_back(W, MethodRecord, Parent) ->
- W ! {send_command_and_signal_back, MethodRecord, Parent},
- ok.
+send_command_sync(W, MethodRecord) ->
+ call(W, MethodRecord).
-send_command_and_signal_back(W, MethodRecord, Content, Parent) ->
- W ! {send_command_and_signal_back, MethodRecord, Content, Parent},
- ok.
+send_command_sync(W, MethodRecord, Content) ->
+ call(W, {MethodRecord, Content}).
send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
ok.
-flush(W) ->
- Ref = make_ref(),
- W ! {flush, self(), Ref},
- receive Ref -> ok end.
+%---------------------------------------------------------------------------
+
+call(Pid, Msg) ->
+ {ok, Res} = gen:call(Pid, '$writer_call', Msg, infinity),
+ Res.
%---------------------------------------------------------------------------