summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rob@rabbitmq.com>2010-11-30 13:53:43 +0000
committerRob Harrop <rob@rabbitmq.com>2010-11-30 13:53:43 +0000
commitda9e6d2ba8ca5eba30dec82c6c06d1217bd1b7b9 (patch)
tree36e7a958024730417f669e72bd9951c097f4c14f
parent846b6a3f9efd003cdb066764bf7855207f4b0260 (diff)
parent0b3bade550d12574a398e4c84615e1e92a44cc70 (diff)
downloadrabbitmq-server-da9e6d2ba8ca5eba30dec82c6c06d1217bd1b7b9.tar.gz
Merged bug20284 into default
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--src/rabbit_net.erl85
-rw-r--r--src/rabbit_writer.erl16
4 files changed, 59 insertions, 51 deletions
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 5032f471..b37f7ab1 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -124,6 +124,9 @@ done
rm -rf %{buildroot}
%changelog
+* Mon Nov 29 2010 rob@rabbitmq.com 2.2.0-1
+- New Upstream Release
+
* Tue Oct 19 2010 vlad@rabbitmq.com 2.1.1-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index e81fda24..a60e691d 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (2.2.0-1) lucid; urgency=low
+
+ * New Upstream Release
+
+ -- Rob Harrop <rob@rabbitmq.com> Mon, 29 Nov 2010 12:24:48 +0000
+
rabbitmq-server (2.1.1-1) lucid; urgency=low
* New Upstream Release
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 0940dce2..89954b06 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -32,9 +32,9 @@
-module(rabbit_net).
-include("rabbit.hrl").
--export([async_recv/3, close/1, controlling_process/2,
- getstat/2, peername/1, peercert/1, port_command/2,
- send/2, sockname/1, is_ssl/1]).
+-export([is_ssl/1, controlling_process/2, getstat/2,
+ async_recv/3, port_command/2, send/2, close/1,
+ sockname/1, peername/1, peercert/1]).
%%---------------------------------------------------------------------------
@@ -49,26 +49,25 @@
-type(ok_or_any_error() :: rabbit_types:ok_or_error(any())).
-type(socket() :: port() | #ssl_socket{}).
+-spec(is_ssl/1 :: (socket()) -> boolean()).
+-spec(controlling_process/2 :: (socket(), pid()) -> ok_or_any_error()).
+-spec(getstat/2 ::
+ (socket(), [stat_option()])
+ -> ok_val_or_error([{stat_option(), integer()}])).
-spec(async_recv/3 ::
(socket(), integer(), timeout()) -> rabbit_types:ok(any())).
--spec(close/1 :: (socket()) -> ok_or_any_error()).
--spec(controlling_process/2 :: (socket(), pid()) -> ok_or_any_error()).
-spec(port_command/2 :: (socket(), iolist()) -> 'true').
--spec(send/2 ::
- (socket(), binary() | iolist()) -> ok_or_any_error()).
+-spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()).
+-spec(close/1 :: (socket()) -> ok_or_any_error()).
+-spec(sockname/1 ::
+ (socket())
+ -> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})).
-spec(peername/1 ::
(socket())
-> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})).
-spec(peercert/1 ::
(socket())
-> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())).
--spec(sockname/1 ::
- (socket())
- -> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})).
--spec(is_ssl/1 :: (socket()) -> boolean()).
--spec(getstat/2 ::
- (socket(), [stat_option()])
- -> ok_val_or_error([{stat_option(), integer()}])).
-endif.
@@ -76,6 +75,18 @@
-define(IS_SSL(Sock), is_record(Sock, ssl_socket)).
+is_ssl(Sock) -> ?IS_SSL(Sock).
+
+controlling_process(Sock, Pid) when ?IS_SSL(Sock) ->
+ ssl:controlling_process(Sock#ssl_socket.ssl, Pid);
+controlling_process(Sock, Pid) when is_port(Sock) ->
+ gen_tcp:controlling_process(Sock, Pid).
+
+getstat(Sock, Stats) when ?IS_SSL(Sock) ->
+ inet:getstat(Sock#ssl_socket.tcp, Stats);
+getstat(Sock, Stats) when is_port(Sock) ->
+ inet:getstat(Sock, Stats).
+
async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) ->
Pid = self(),
Ref = make_ref(),
@@ -90,31 +101,6 @@ async_recv(Sock, Length, infinity) when is_port(Sock) ->
async_recv(Sock, Length, Timeout) when is_port(Sock) ->
prim_inet:async_recv(Sock, Length, Timeout).
-close(Sock) when ?IS_SSL(Sock) ->
- ssl:close(Sock#ssl_socket.ssl);
-close(Sock) when is_port(Sock) ->
- gen_tcp:close(Sock).
-
-controlling_process(Sock, Pid) when ?IS_SSL(Sock) ->
- ssl:controlling_process(Sock#ssl_socket.ssl, Pid);
-controlling_process(Sock, Pid) when is_port(Sock) ->
- gen_tcp:controlling_process(Sock, Pid).
-
-getstat(Sock, Stats) when ?IS_SSL(Sock) ->
- inet:getstat(Sock#ssl_socket.tcp, Stats);
-getstat(Sock, Stats) when is_port(Sock) ->
- inet:getstat(Sock, Stats).
-
-peername(Sock) when ?IS_SSL(Sock) ->
- ssl:peername(Sock#ssl_socket.ssl);
-peername(Sock) when is_port(Sock) ->
- inet:peername(Sock).
-
-peercert(Sock) when ?IS_SSL(Sock) ->
- ssl:peercert(Sock#ssl_socket.ssl);
-peercert(Sock) when is_port(Sock) ->
- nossl.
-
port_command(Sock, Data) when ?IS_SSL(Sock) ->
case ssl:send(Sock#ssl_socket.ssl, Data) of
ok -> self() ! {inet_reply, Sock, ok},
@@ -124,16 +110,17 @@ port_command(Sock, Data) when ?IS_SSL(Sock) ->
port_command(Sock, Data) when is_port(Sock) ->
erlang:port_command(Sock, Data).
-send(Sock, Data) when ?IS_SSL(Sock) ->
- ssl:send(Sock#ssl_socket.ssl, Data);
-send(Sock, Data) when is_port(Sock) ->
- gen_tcp:send(Sock, Data).
+send(Sock, Data) when ?IS_SSL(Sock) -> ssl:send(Sock#ssl_socket.ssl, Data);
+send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data).
+
+close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl);
+close(Sock) when is_port(Sock) -> gen_tcp:close(Sock).
+sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl);
+sockname(Sock) when is_port(Sock) -> inet:sockname(Sock).
-sockname(Sock) when ?IS_SSL(Sock) ->
- ssl:sockname(Sock#ssl_socket.ssl);
-sockname(Sock) when is_port(Sock) ->
- inet:sockname(Sock).
+peername(Sock) when ?IS_SSL(Sock) -> ssl:peername(Sock#ssl_socket.ssl);
+peername(Sock) when is_port(Sock) -> inet:peername(Sock).
-is_ssl(Sock) ->
- ?IS_SSL(Sock).
+peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl);
+peercert(Sock) when is_port(Sock) -> nossl.
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 50bca390..1b4710c6 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -34,8 +34,9 @@
-include("rabbit_framing.hrl").
-export([start/5, start_link/5, mainloop/2, mainloop1/2]).
--export([send_command/2, send_command/3, send_command_sync/2,
- send_command_sync/3, send_command_and_notify/5]).
+-export([send_command/2, send_command/3,
+ send_command_sync/2, send_command_sync/3,
+ send_command_and_notify/4, send_command_and_notify/5]).
-export([internal_send_command/4, internal_send_command/6]).
-import(gen_tcp).
@@ -66,6 +67,9 @@
-spec(send_command_sync/3 ::
(pid(), rabbit_framing:amqp_method_record(), rabbit_types:content())
-> 'ok').
+-spec(send_command_and_notify/4 ::
+ (pid(), pid(), pid(), rabbit_framing:amqp_method_record())
+ -> 'ok').
-spec(send_command_and_notify/5 ::
(pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
rabbit_types:content())
@@ -130,6 +134,10 @@ handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}},
ok = internal_send_command_async(MethodRecord, Content, State),
gen_server:reply(From, ok),
State;
+handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) ->
+ ok = internal_send_command_async(MethodRecord, State),
+ rabbit_amqqueue:notify_sent(QPid, ChPid),
+ State;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
State) ->
ok = internal_send_command_async(MethodRecord, Content, State),
@@ -158,6 +166,10 @@ send_command_sync(W, MethodRecord) ->
send_command_sync(W, MethodRecord, Content) ->
call(W, {send_command_sync, MethodRecord, Content}).
+send_command_and_notify(W, Q, ChPid, MethodRecord) ->
+ W ! {send_command_and_notify, Q, ChPid, MethodRecord},
+ ok.
+
send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
ok.