summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/rabbitmqctl.1.pod2
-rw-r--r--include/rabbit.hrl9
-rw-r--r--packaging/RPMS/Fedora/Makefile6
-rw-r--r--packaging/RPMS/Fedora/init.d2
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec13
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rw-r--r--packaging/debs/Debian/debian/control2
-rw-r--r--packaging/macports/net/rabbitmq-server/Portfile66
-rw-r--r--packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper23
-rw-r--r--packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper2
-rw-r--r--packaging/windows/rabbitmq-service.pod6
-rw-r--r--src/rabbit.erl12
-rw-r--r--src/rabbit_access_control.erl181
-rw-r--r--src/rabbit_amqqueue.erl163
-rw-r--r--src/rabbit_amqqueue_process.erl320
-rw-r--r--src/rabbit_basic.erl75
-rw-r--r--src/rabbit_channel.erl310
-rw-r--r--src/rabbit_error_logger.erl10
-rw-r--r--src/rabbit_exchange.erl240
-rw-r--r--src/rabbit_limiter.erl18
-rw-r--r--src/rabbit_misc.erl20
-rw-r--r--src/rabbit_mnesia.erl13
-rw-r--r--src/rabbit_persister.erl2
-rw-r--r--src/rabbit_reader.erl4
-rw-r--r--src/rabbit_router.erl10
25 files changed, 833 insertions, 682 deletions
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod
index c938548f..d0a27a36 100644
--- a/docs/rabbitmqctl.1.pod
+++ b/docs/rabbitmqctl.1.pod
@@ -196,7 +196,7 @@ name
URL-encoded name of the exchange
type
- exchange type (B<direct>, B<topic> or B<fanout>)
+ exchange type (B<direct>, B<topic>, B<fanout>, or B<headers>)
durable
whether the exchange survives server restarts
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index d07aeaf8..a026602a 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -62,6 +62,8 @@
-record(basic_message, {exchange_name, routing_key, content, persistent_key}).
+-record(delivery, {mandatory, immediate, txn, sender, message}).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -127,6 +129,12 @@
content :: content(),
persistent_key :: maybe(pkey())}).
-type(message() :: basic_message()).
+-type(delivery() ::
+ #delivery{mandatory :: bool(),
+ immediate :: bool(),
+ txn :: maybe(txn()),
+ sender :: pid(),
+ message :: message()}).
%% this really should be an abstract type
-type(msg_id() :: non_neg_integer()).
-type(msg() :: {queue_name(), pid(), msg_id(), bool(), message()}).
@@ -136,6 +144,7 @@
host :: string() | atom(),
port :: non_neg_integer()}).
-type(not_found() :: {'error', 'not_found'}).
+-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
-endif.
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index 9fe91b98..c74d4533 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -13,12 +13,10 @@ endif
ifeq "x$(RPM_OS)" "xsuse"
REQUIRES=/sbin/chkconfig /sbin/service
-OS_DEFINES=--define '_initrddir /etc/init.d'
-RELEASE_OS=.suse
+OS_DEFINES=--define '_initrddir /etc/init.d' --define 'dist .suse'
else
REQUIRES=chkconfig initscripts
OS_DEFINES=--define '_initrddir /etc/rc.d/init.d'
-RELEASE_OS=
endif
rpms: clean server
@@ -27,7 +25,7 @@ prepare:
mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp
cp $(TOP_DIR)/$(TARBALL) SOURCES
cp rabbitmq-server.spec SPECS
- sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|;s|%%RELEASE_OS%%|$(RELEASE_OS)|' \
+ sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|' \
SPECS/rabbitmq-server.spec
cp init.d SOURCES/rabbitmq-server.init
diff --git a/packaging/RPMS/Fedora/init.d b/packaging/RPMS/Fedora/init.d
index a9155f3b..77a6a89a 100644
--- a/packaging/RPMS/Fedora/init.d
+++ b/packaging/RPMS/Fedora/init.d
@@ -8,6 +8,8 @@
### BEGIN INIT INFO
# Provides: rabbitmq-server
+# Default-Start:
+# Default-Stop:
# Required-Start: $remote_fs $network
# Required-Stop: $remote_fs $network
# Description: RabbitMQ broker
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 6bf3b841..875381e8 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -2,7 +2,7 @@
Name: rabbitmq-server
Version: %%VERSION%%
-Release: 1%%RELEASE_OS%%
+Release: 1%{?dist}
License: MPLv1.1
Group: Development/Libraries
Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.gz
@@ -34,7 +34,11 @@ scalable implementation of an AMQP broker.
%build
cp %{S:2} %{_rabbit_wrapper}
sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_wrapper}
-make %{?_smp_mflags}
+
+# The rabbitmq build needs escript, which is missing from /usr/bin in
+# some versions of the erlang RPM. See
+# <https://bugzilla.redhat.com/show_bug.cgi?id=481302>
+PATH=%{_libdir}/erlang/bin:$PATH make %{?_smp_mflags}
%install
rm -rf %{buildroot}
@@ -107,12 +111,15 @@ fi
%{_rabbit_libdir}
%{_initrddir}/rabbitmq-server
%config(noreplace) %{_sysconfdir}/logrotate.d/rabbitmq-server
-%doc LICENSE LICENSE-MPL-RabbitMQ INSTALL
+%doc LICENSE LICENSE-MPL-RabbitMQ
%clean
rm -rf %{buildroot}
%changelog
+* Tue May 19 2009 Matthias Radestock <matthias@lshift.net> 1.5.5-1
+- Maintenance release for the 1.5.x series
+
* Mon Apr 6 2009 Matthias Radestock <matthias@lshift.net> 1.5.4-1
- Maintenance release for the 1.5.x series
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index d1ccd3a0..7c5673f7 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (1.5.5-1) hardy; urgency=low
+
+ * New Upstream Release
+
+ -- Matthias Radestock <matthias@lshift.net> Tue, 19 May 2009 09:57:54 +0100
+
rabbitmq-server (1.5.4-1) hardy; urgency=low
* New Upstream Release
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index b2b3ab02..21636072 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -7,7 +7,7 @@ Standards-Version: 3.8.0
Package: rabbitmq-server
Architecture: all
-Depends: erlang-nox, adduser, logrotate, ${misc:Depends}
+Depends: erlang-nox, erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends}
Description: An AMQP server written in Erlang
RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/net/rabbitmq-server/Portfile
index d9d16dbb..7fb31ad6 100644
--- a/packaging/macports/net/rabbitmq-server/Portfile
+++ b/packaging/macports/net/rabbitmq-server/Portfile
@@ -1,15 +1,15 @@
# -*- coding: utf-8; mode: tcl; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- vim:fenc=utf-8:filetype=tcl:et:sw=4:ts=4:sts=4
# $Id$
-PortSystem 1.0
-
-name rabbitmq-server
-version 1.5.3
-categories net
-maintainers tonyg@rabbitmq.com
-platforms darwin
-description The RabbitMQ AMQP Server
-long_description \
+PortSystem 1.0
+name rabbitmq-server
+version 1.5.3
+revision 0
+categories net
+maintainers tonyg@rabbitmq.com
+platforms darwin
+description The RabbitMQ AMQP Server
+long_description \
RabbitMQ is an implementation of AMQP, the emerging standard for \
high performance enterprise messaging. The RabbitMQ server is a \
robust and scalable implementation of an AMQP broker.
@@ -32,6 +32,8 @@ set serverhome ${prefix}/var/lib/rabbitmq
set logdir ${prefix}/var/log/rabbitmq
set mnesiadbdir ${prefix}/var/lib/rabbitmq/mnesia
set plistloc ${prefix}/etc/LaunchDaemons/org.macports.rabbitmq-server
+set sbindir ${destroot}${prefix}/lib/rabbitmq/bin
+set wrappersbin ${destroot}${prefix}/sbin
use_configure no
@@ -41,7 +43,7 @@ build.args PYTHON=${prefix}/bin/python2.5
destroot.destdir \
TARGET_DIR=${destroot}${prefix}/lib/erlang/lib/rabbitmq_server-${version} \
- SBIN_DIR=${destroot}${prefix}/sbin \
+ SBIN_DIR=${sbindir} \
MAN_DIR=${destroot}${prefix}/share/man
destroot.keepdirs \
@@ -59,32 +61,36 @@ post-destroot {
xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir}
reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \
- ${destroot}${prefix}/sbin/rabbitmq-multi \
- ${destroot}${prefix}/sbin/rabbitmq-server \
- ${destroot}${prefix}/sbin/rabbitmqctl
+ ${sbindir}/rabbitmq-multi \
+ ${sbindir}/rabbitmq-server \
+ ${sbindir}/rabbitmqctl
reinplace -E "s:(RABBITMQ_CLUSTER_CONFIG_FILE)=/:\\1=${prefix}/:" \
- ${destroot}${prefix}/sbin/rabbitmq-multi \
- ${destroot}${prefix}/sbin/rabbitmq-server \
- ${destroot}${prefix}/sbin/rabbitmqctl
+ ${sbindir}/rabbitmq-multi \
+ ${sbindir}/rabbitmq-server \
+ ${sbindir}/rabbitmqctl
reinplace -E "s:(RABBITMQ_LOG_BASE)=/:\\1=${prefix}/:" \
- ${destroot}${prefix}/sbin/rabbitmq-multi \
- ${destroot}${prefix}/sbin/rabbitmq-server \
- ${destroot}${prefix}/sbin/rabbitmqctl
+ ${sbindir}/rabbitmq-multi \
+ ${sbindir}/rabbitmq-server \
+ ${sbindir}/rabbitmqctl
reinplace -E "s:(RABBITMQ_MNESIA_BASE)=/:\\1=${prefix}/:" \
- ${destroot}${prefix}/sbin/rabbitmq-multi \
- ${destroot}${prefix}/sbin/rabbitmq-server \
- ${destroot}${prefix}/sbin/rabbitmqctl
+ ${sbindir}/rabbitmq-multi \
+ ${sbindir}/rabbitmq-server \
+ ${sbindir}/rabbitmqctl
reinplace -E "s:(RABBITMQ_PIDS_FILE)=/:\\1=${prefix}/:" \
- ${destroot}${prefix}/sbin/rabbitmq-multi \
- ${destroot}${prefix}/sbin/rabbitmq-server \
- ${destroot}${prefix}/sbin/rabbitmqctl
+ ${sbindir}/rabbitmq-multi \
+ ${sbindir}/rabbitmq-server \
+ ${sbindir}/rabbitmqctl
- file rename ${destroot}${prefix}/sbin/rabbitmqctl ${destroot}${prefix}/sbin/rabbitmqctl_real
- xinstall -m 555 ${filespath}/rabbitmqctl_wrapper ${destroot}${prefix}/sbin
- file rename ${destroot}${prefix}/sbin/rabbitmqctl_wrapper ${destroot}${prefix}/sbin/rabbitmqctl
+ xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \
+ ${wrappersbin}/rabbitmq-multi
+
+ reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \
+ ${wrappersbin}/rabbitmq-multi
+ reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \
+ ${wrappersbin}/rabbitmq-multi
+ file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server
+ file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl
- reinplace -E "s:@PREFIX@:${prefix}:" \
- ${destroot}${prefix}/sbin/rabbitmqctl
}
pre-install {
diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper
new file mode 100644
index 00000000..b806049c
--- /dev/null
+++ b/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper
@@ -0,0 +1,23 @@
+#!/bin/bash
+# Escape spaces and quotes, because shell is revolting.
+for arg in "$@" ; do
+ # Escape quotes in parameters, so that they're passed through cleanly.
+ arg=$(sed -e 's/"/\\"/' <<-END
+ $arg
+ END
+ )
+ CMDLINE="${CMDLINE} \"${arg}\""
+done
+
+cd /var/lib/rabbitmq
+
+SCRIPT=`basename $0`
+
+if [ `id -u` = 0 ] ; then
+ sudo -u rabbitmq -H /usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}
+else
+ /usr/lib/rabbitmq/bin/${SCRIPT}
+ echo -e "\nOnly root should run ${SCRIPT}\n"
+ exit 1
+fi
+
diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper
deleted file mode 100644
index 1996811e..00000000
--- a/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper
+++ /dev/null
@@ -1,2 +0,0 @@
-#!/bin/bash
-exec sudo -H -u rabbitmq "@PREFIX@/sbin/rabbitmqctl_real" "$@"
diff --git a/packaging/windows/rabbitmq-service.pod b/packaging/windows/rabbitmq-service.pod
index 92648076..7c4d3ef2 100644
--- a/packaging/windows/rabbitmq-service.pod
+++ b/packaging/windows/rabbitmq-service.pod
@@ -92,10 +92,8 @@ Defaults to 5672.
=head2 ERLANG_SERVICE_MANAGER_PATH
-Defaults to F<C:\Program Files\erl5.5.5\erts-5.5.5\bin>
-(or F<C:\Program Files (x86)\erl5.5.5\erts-5.5.5\bin> for 64-bit
-environments). This is the installation location of the Erlang service
-manager.
+Defaults to F<C:\Program Files\erl5.5.5\erts-5.5.5\bin>. This is
+the installation location of the Erlang service manager.
=head2 CLUSTER_CONFIG_FILE
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 97bbdd99..c8c814b6 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -238,10 +238,14 @@ print_banner() ->
[Product, Version,
?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR,
?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
- io:format("Logging to ~p~nSASL logging to ~p~n~n",
- [log_location(kernel), log_location(sasl)]).
-
-
+ Settings = [{"node", node()},
+ {"log", log_location(kernel)},
+ {"sasl log", log_location(sasl)},
+ {"database dir", rabbit_mnesia:dir()}],
+ DescrLen = lists:max([length(K) || {K, _V} <- Settings]),
+ Format = "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n",
+ lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings),
+ io:nl().
start_child(Mod) ->
{ok,_} = supervisor:start_child(rabbit_sup,
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 4a0ff79e..54348d9a 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -34,11 +34,12 @@
-include("rabbit.hrl").
-export([check_login/2, user_pass_login/2,
- check_vhost_access/2]).
+ check_vhost_access/2, check_resource_access/3]).
-export([add_user/2, delete_user/1, change_password/2, list_users/0,
lookup_user/1]).
--export([add_vhost/1, delete_vhost/1, list_vhosts/0, list_vhost_users/1]).
--export([list_user_vhosts/1, map_user_vhost/2, unmap_user_vhost/2]).
+-export([add_vhost/1, delete_vhost/1, list_vhosts/0]).
+-export([set_permissions/5, clear_permissions/2,
+ list_vhost_permissions/1, list_user_permissions/1]).
%%----------------------------------------------------------------------------
@@ -47,6 +48,8 @@
-spec(check_login/2 :: (binary(), binary()) -> user()).
-spec(user_pass_login/2 :: (username(), password()) -> user()).
-spec(check_vhost_access/2 :: (user(), vhost()) -> 'ok').
+-spec(check_resource_access/3 ::
+ (username(), r(atom()), non_neg_integer()) -> 'ok').
-spec(add_user/2 :: (username(), password()) -> 'ok').
-spec(delete_user/1 :: (username()) -> 'ok').
-spec(change_password/2 :: (username(), password()) -> 'ok').
@@ -55,10 +58,13 @@
-spec(add_vhost/1 :: (vhost()) -> 'ok').
-spec(delete_vhost/1 :: (vhost()) -> 'ok').
-spec(list_vhosts/0 :: () -> [vhost()]).
--spec(list_vhost_users/1 :: (vhost()) -> [username()]).
--spec(list_user_vhosts/1 :: (username()) -> [vhost()]).
--spec(map_user_vhost/2 :: (username(), vhost()) -> 'ok').
--spec(unmap_user_vhost/2 :: (username(), vhost()) -> 'ok').
+-spec(set_permissions/5 ::
+ (username(), vhost(), regexp(), regexp(), regexp()) -> 'ok').
+-spec(clear_permissions/2 :: (username(), vhost()) -> 'ok').
+-spec(list_vhost_permissions/1 ::
+ (vhost()) -> [{username(), regexp(), regexp(), regexp()}]).
+-spec(list_user_permissions/1 ::
+ (username()) -> [{vhost(), regexp(), regexp(), regexp()}]).
-endif.
@@ -112,9 +118,9 @@ internal_lookup_vhost_access(Username, VHostPath) ->
%% TODO: use dirty ops instead
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:match_object(
- #user_vhost{username = Username,
- virtual_host = VHostPath}) of
+ case mnesia:read({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}}) of
[] -> not_found;
[R] -> {ok, R}
end
@@ -131,13 +137,47 @@ check_vhost_access(#user{username = Username}, VHostPath) ->
[VHostPath, Username])
end.
+check_resource_access(Username,
+ R = #resource{kind = exchange, name = <<"">>},
+ Permission) ->
+ check_resource_access(Username,
+ R#resource{name = <<"amq.default">>},
+ Permission);
+check_resource_access(_Username,
+ #resource{name = <<"amq.gen",_/binary>>},
+ _Permission) ->
+ ok;
+check_resource_access(Username,
+ R = #resource{virtual_host = VHostPath, name = Name},
+ Permission) ->
+ Res = case mnesia:dirty_read({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}}) of
+ [] ->
+ false;
+ [#user_permission{permission = P}] ->
+ case regexp:match(
+ binary_to_list(Name),
+ binary_to_list(element(Permission, P))) of
+ {match, _, _} -> true;
+ nomatch -> false
+ end
+ end,
+ if Res -> ok;
+ true -> rabbit_misc:protocol_error(
+ access_refused, "access to ~s refused for user '~s'",
+ [rabbit_misc:rs(R), Username])
+ end.
+
add_user(Username, Password) ->
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:read({user, Username}) of
+ case mnesia:wread({rabbit_user, Username}) of
[] ->
- ok = mnesia:write(#user{username = Username,
- password = Password});
+ ok = mnesia:write(rabbit_user,
+ #user{username = Username,
+ password = Password},
+ write);
_ ->
mnesia:abort({user_already_exists, Username})
end
@@ -150,10 +190,6 @@ delete_user(Username) ->
rabbit_misc:with_user(
Username,
fun () ->
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_access_control.erl
- ok = mnesia:delete({user, Username}),
- ok = mnesia:delete({user_vhost, Username})
-=======
ok = mnesia:delete({rabbit_user, Username}),
[ok = mnesia:delete_object(
rabbit_user_permission, R, write) ||
@@ -165,7 +201,6 @@ delete_user(Username) ->
permission = '_'},
write)],
ok
->>>>>>> /tmp/rabbit_access_control.erl~other.wd6AMx
end)),
rabbit_log:info("Deleted user ~p~n", [Username]),
R.
@@ -175,24 +210,28 @@ change_password(Username, Password) ->
rabbit_misc:with_user(
Username,
fun () ->
- ok = mnesia:write(#user{username = Username,
- password = Password})
+ ok = mnesia:write(rabbit_user,
+ #user{username = Username,
+ password = Password},
+ write)
end)),
rabbit_log:info("Changed password for user ~p~n", [Username]),
R.
list_users() ->
- mnesia:dirty_all_keys(user).
+ mnesia:dirty_all_keys(rabbit_user).
lookup_user(Username) ->
- rabbit_misc:dirty_read({user, Username}).
+ rabbit_misc:dirty_read({rabbit_user, Username}).
add_vhost(VHostPath) ->
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:read({vhost, VHostPath}) of
+ case mnesia:wread({rabbit_vhost, VHostPath}) of
[] ->
- ok = mnesia:write(#vhost{virtual_host = VHostPath}),
+ ok = mnesia:write(rabbit_vhost,
+ #vhost{virtual_host = VHostPath},
+ write),
[rabbit_exchange:declare(
rabbit_misc:r(VHostPath, exchange, Name),
Type, true, false, []) ||
@@ -200,6 +239,8 @@ add_vhost(VHostPath) ->
[{<<"">>, direct},
{<<"amq.direct">>, direct},
{<<"amq.topic">>, topic},
+ {<<"amq.match">>, headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, headers}, %% per 0-9-1 xml
{<<"amq.fanout">>, fanout}]],
ok;
[_] ->
@@ -232,53 +273,79 @@ internal_delete_vhost(VHostPath) ->
ok = rabbit_exchange:delete(Name, false)
end,
rabbit_exchange:list(VHostPath)),
- lists:foreach(fun (Username) ->
- ok = unmap_user_vhost(Username, VHostPath)
+ lists:foreach(fun ({Username, _, _, _}) ->
+ ok = clear_permissions(Username, VHostPath)
end,
- list_vhost_users(VHostPath)),
- ok = mnesia:delete({vhost, VHostPath}),
+ list_vhost_permissions(VHostPath)),
+ ok = mnesia:delete({rabbit_vhost, VHostPath}),
ok.
list_vhosts() ->
- mnesia:dirty_all_keys(vhost).
+ mnesia:dirty_all_keys(rabbit_vhost).
-list_vhost_users(VHostPath) ->
- [Username ||
- #user_vhost{username = Username} <-
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_vhost(
- VHostPath,
- fun () -> mnesia:index_read(user_vhost, VHostPath,
- #user_vhost.virtual_host)
- end))].
-
-list_user_vhosts(Username) ->
- [VHostPath ||
- #user_vhost{virtual_host = VHostPath} <-
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user(
- Username,
- fun () -> mnesia:read({user_vhost, Username}) end))].
+validate_regexp(RegexpBin) ->
+ Regexp = binary_to_list(RegexpBin),
+ case regexp:parse(Regexp) of
+ {ok, _} -> ok;
+ {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}})
+ end.
-map_user_vhost(Username, VHostPath) ->
+set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
+ lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]),
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
Username, VHostPath,
- fun () ->
- ok = mnesia:write(
- #user_vhost{username = Username,
- virtual_host = VHostPath})
+ fun () -> ok = mnesia:write(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = VHostPath},
+ permission = #permission{
+ configure = ConfigurePerm,
+ write = WritePerm,
+ read = ReadPerm}},
+ write)
end)).
-unmap_user_vhost(Username, VHostPath) ->
+clear_permissions(Username, VHostPath) ->
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
Username, VHostPath,
fun () ->
- ok = mnesia:delete_object(
- #user_vhost{username = Username,
- virtual_host = VHostPath})
+ ok = mnesia:delete({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}})
end)).
+list_vhost_permissions(VHostPath) ->
+ [{Username, ConfigurePerm, WritePerm, ReadPerm} ||
+ {Username, _, ConfigurePerm, WritePerm, ReadPerm} <-
+ list_permissions(rabbit_misc:with_vhost(
+ VHostPath, match_user_vhost('_', VHostPath)))].
+
+list_user_permissions(Username) ->
+ [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
+ list_permissions(rabbit_misc:with_user(
+ Username, match_user_vhost(Username, '_')))].
+
+list_permissions(QueryThunk) ->
+ [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ #user_permission{user_vhost = #user_vhost{username = Username,
+ virtual_host = VHostPath},
+ permission = #permission{
+ configure = ConfigurePerm,
+ write = WritePerm,
+ read = ReadPerm}} <-
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(QueryThunk)].
+
+match_user_vhost(Username, VHostPath) ->
+ fun () -> mnesia:match_object(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = VHostPath},
+ permission = '_'},
+ read)
+ end.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9b79cc15..eb076e94 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -32,18 +32,18 @@
-module(rabbit_amqqueue).
-export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]).
--export([conserve_memory/1, conserve_memory/2, pseudo_queue/2]).
+-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
-export([claim_queue/2]).
--export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2]).
--export([commit_all/2, rollback_all/2, notify_down_all/2]).
+-export([basic_get/3, basic_consume/8, basic_cancel/4]).
+-export([notify_sent/2, unblock/2]).
+-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-import(mnesia).
--import(gen_server).
+-import(gen_server2).
-import(lists).
-import(queue).
@@ -64,8 +64,6 @@
-spec(start/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
--spec(conserve_memory/1 :: (bool()) -> 'ok').
--spec(conserve_memory/2 :: (pid(), bool()) -> 'ok').
-spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) ->
amqqueue()).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
@@ -90,18 +88,20 @@
-spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok').
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
--spec(commit/2 :: (pid(), txn()) -> 'ok' | {'error', any()}).
--spec(rollback/2 :: (pid(), txn()) -> 'ok' | {'error', any()}).
+-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
+-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
+-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
--spec(basic_consume/7 ::
- (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) ->
+-spec(basic_consume/8 ::
+ (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) ->
'ok' | {'error', 'queue_owned_by_another_connection' |
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
+-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -124,21 +124,6 @@ recover() ->
recover_durable_queues() ->
Node = node(),
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl
- %% TODO: use dirty ops instead
- R = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(durable_queues),
- node(Pid) == Node]))
- end),
- Queues = lists:map(fun start_queue_process/1, R),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- lists:foreach(fun store_queue/1, Queues),
- ok
- end).
-=======
lists:foreach(
fun (RecoveredQ) ->
Q = start_queue_process(RecoveredQ),
@@ -165,20 +150,6 @@ recover_durable_queues() ->
node(Pid) == Node]))
end)),
ok.
->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH
-
-conserve_memory(Conserve) ->
- [ok = gen_server:cast(QPid, {conserve_memory, Conserve}) ||
- {_, QPid, worker, _} <-
- supervisor:which_children(rabbit_amqqueue_sup)],
- ok.
-
-conserve_memory(QPid, Conserve) ->
- %% This needs to be synchronous. It is called during queue
- %% creation and we need to make sure that the memory conservation
- %% status of the queue has been set before it becomes reachable in
- %% message routing.
- gen_server:call(QPid, {conserve_memory, Conserve}).
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,
@@ -188,7 +159,7 @@ declare(QueueName, Durable, AutoDelete, Args) ->
pid = none}),
case rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({amqqueue, QueueName}) of
+ case mnesia:wread({rabbit_queue, QueueName}) of
[] -> ok = store_queue(Q),
ok = add_default_binding(Q),
Q;
@@ -201,16 +172,15 @@ declare(QueueName, Durable, AutoDelete, Args) ->
end.
store_queue(Q = #amqqueue{durable = true}) ->
- ok = mnesia:write(durable_queues, Q, write),
- ok = mnesia:write(Q),
+ ok = mnesia:write(rabbit_durable_queue, Q, write),
+ ok = mnesia:write(rabbit_queue, Q, write),
ok;
store_queue(Q = #amqqueue{durable = false}) ->
- ok = mnesia:write(Q),
+ ok = mnesia:write(rabbit_queue, Q, write),
ok.
start_queue_process(Q) ->
{ok, Pid} = supervisor:start_child(rabbit_amqqueue_sup, [Q]),
- ok = rabbit_alarm:maybe_conserve_memory(Pid),
Q#amqqueue{pid = Pid}.
add_default_binding(#amqqueue{name = QueueName}) ->
@@ -220,7 +190,7 @@ add_default_binding(#amqqueue{name = QueueName}) ->
ok.
lookup(Name) ->
- rabbit_misc:dirty_read({amqqueue, Name}).
+ rabbit_misc:dirty_read({rabbit_queue, Name}).
with(Name, F, E) ->
case lookup(Name) of
@@ -237,23 +207,16 @@ with_or_die(Name, F) ->
list(VHostPath) ->
mnesia:dirty_match_object(
+ rabbit_queue,
#amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}).
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl
- gen_server:call(QPid, info).
-=======
gen_server2:pcall(QPid, 9, info, infinity).
->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH
info(#amqqueue{ pid = QPid }, Items) ->
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl
- case gen_server:call(QPid, {info, Items}) of
-=======
case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of
->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -262,68 +225,44 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl
-stat(#amqqueue{pid = QPid}) -> gen_server:call(QPid, stat).
-=======
stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity).
->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH
stat_all() ->
- lists:map(fun stat/1, rabbit_misc:dirty_read_all(amqqueue)).
+ lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl
- gen_server:call(QPid, {delete, IfUnused, IfEmpty}).
-=======
gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity).
->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl
-purge(#amqqueue{ pid = QPid }) -> gen_server:call(QPid, purge).
-=======
purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity).
->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH
deliver(_IsMandatory, true, Txn, Message, QPid) ->
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl
- gen_server:call(QPid, {deliver_immediately, Txn, Message});
-=======
gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity);
->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH
deliver(true, _IsImmediate, Txn, Message, QPid) ->
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl
- gen_server:call(QPid, {deliver, Txn, Message}),
-=======
gen_server2:call(QPid, {deliver, Txn, Message}, infinity),
->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH
true;
deliver(false, _IsImmediate, Txn, Message, QPid) ->
- gen_server:cast(QPid, {deliver, Txn, Message}),
+ gen_server2:cast(QPid, {deliver, Txn, Message}),
true.
redeliver(QPid, Messages) ->
- gen_server:cast(QPid, {redeliver, Messages}).
+ gen_server2:cast(QPid, {redeliver, Messages}).
requeue(QPid, MsgIds, ChPid) ->
- gen_server:cast(QPid, {requeue, MsgIds, ChPid}).
+ gen_server2:cast(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
- gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}).
+ gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}).
commit_all(QPids, Txn) ->
safe_pmap_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl
- fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end,
-=======
fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end,
->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH
QPids).
rollback_all(QPids, Txn) ->
safe_pmap_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end,
+ fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end,
QPids).
notify_down_all(QPids, ChPid) ->
@@ -331,78 +270,50 @@ notify_down_all(QPids, ChPid) ->
%% we don't care if the queue process has terminated in the
%% meantime
fun (_) -> ok end,
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl
- fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end,
-=======
fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end,
->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH
QPids).
+limit_all(QPids, ChPid, LimiterPid) ->
+ safe_pmap_ok(
+ fun (_) -> ok end,
+ fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end,
+ QPids).
+
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl
- gen_server:call(QPid, {claim_queue, ReaderPid}).
-=======
gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity).
->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl
- gen_server:call(QPid, {basic_get, ChPid, NoAck}).
-=======
gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity).
->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH
-basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid,
+basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl
- gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
- ConsumerTag, ExclusiveConsume, OkMsg}).
-=======
gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
infinity).
->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl
- ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
-=======
ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
infinity).
->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH
notify_sent(QPid, ChPid) ->
- gen_server:cast(QPid, {notify_sent, ChPid}).
+ gen_server2:cast(QPid, {notify_sent, ChPid}).
+
+unblock(QPid, ChPid) ->
+ gen_server2:cast(QPid, {unblock, ChPid}).
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl
- case mnesia:wread({amqqueue, QueueName}) of
- [] -> {error, not_found};
- [Q] ->
- ok = delete_queue(Q),
- ok = mnesia:delete({durable_queues, QueueName}),
-=======
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> {error, not_found};
[_] ->
ok = rabbit_exchange:delete_queue_bindings(QueueName),
ok = mnesia:delete({rabbit_queue, QueueName}),
ok = mnesia:delete({rabbit_durable_queue, QueueName}),
->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH
ok
end
end).
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl
-delete_queue(#amqqueue{name = QueueName}) ->
- ok = rabbit_exchange:delete_bindings_for_queue(QueueName),
- ok = mnesia:delete({amqqueue, QueueName}),
- ok.
-
-=======
->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH
on_node_down(Node) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
@@ -414,15 +325,9 @@ on_node_down(Node) ->
Acc
end,
ok,
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl
- qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(amqqueue),
- node(Pid) == Node]))
-=======
qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid}
<- mnesia:table(rabbit_queue),
node(Pid) == Node]))
->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH
end).
pseudo_queue(QueueName, Pid) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7919be52..c390b2b7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--behaviour(gen_server).
+-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
-define(HIBERNATE_AFTER, 1000).
@@ -51,23 +51,21 @@
owner,
exclusive_consumer,
has_had_consumers,
- conserve_memory,
- dropped_message_count,
next_msg_id,
message_buffer,
round_robin}).
-record(consumer, {tag, ack_required}).
--record(tx, {ch_pid, is_persistent, fail_reason,
- pending_messages, pending_acks}).
+-record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}).
%% These are held in our process dictionary
-record(cr, {consumers,
ch_pid,
+ limiter_pid,
monitor_ref,
unacked_messages,
- is_overload_protection_active,
+ is_limit_active,
unsent_message_count}).
-define(INFO_KEYS,
@@ -88,7 +86,7 @@
%%----------------------------------------------------------------------------
start_link(Q) ->
- gen_server:start_link(?MODULE, Q, []).
+ gen_server2:start_link(?MODULE, Q, []).
%%----------------------------------------------------------------------------
@@ -98,16 +96,13 @@ init(Q) ->
owner = none,
exclusive_consumer = none,
has_had_consumers = false,
- conserve_memory = false,
- dropped_message_count = 0,
next_msg_id = 1,
message_buffer = queue:new(),
round_robin = queue:new()}, ?HIBERNATE_AFTER}.
-terminate(_Reason, State = #q{dropped_message_count = C}) ->
+terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
QName = qname(State),
- log_dropped_message_count(QName, C),
lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end,
all_tx()),
ok = purge_message_buffer(QName, State#q.message_buffer),
@@ -118,20 +113,9 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-log_dropped_message_count(_QName, 0) ->
- ok;
-log_dropped_message_count(QName, C) ->
- rabbit_log:warning("~s dropped ~p messages to conserve memory~n",
- [rabbit_misc:rs(QName), C]),
- ok.
-
-conserve_memory(false, State = #q{q = #amqqueue{name = QName},
- conserve_memory = true,
- dropped_message_count = C}) ->
- log_dropped_message_count(QName, C),
- State#q{conserve_memory = false, dropped_message_count = 0};
-conserve_memory(Conserve, State) ->
- State#q{conserve_memory = Conserve}.
+reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}.
+
+noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -148,7 +132,7 @@ ch_record(ChPid) ->
ch_pid = ChPid,
monitor_ref = MonitorRef,
unacked_messages = dict:new(),
- is_overload_protection_active = false,
+ is_limit_active = false,
unsent_message_count = 0},
put(Key, C),
C;
@@ -161,45 +145,18 @@ store_ch_record(C = #cr{ch_pid = ChPid}) ->
all_ch_record() ->
[C || {{ch, _}, C} <- get()].
-<<<<<<< local
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
Limited orelse Count > ?UNSENT_MESSAGE_LIMIT.
-=======
-update_store_and_maybe_block_ch(
- C = #cr{is_overload_protection_active = Active,
- unsent_message_count = Count}) ->
- {Result, NewActive} =
- if
- not(Active) and (Count > ?UNSENT_MESSAGE_LIMIT) ->
- {block_ch, true};
- Active and (Count == 0) ->
- {unblock_ch, false};
- true ->
- {ok, Active}
- end,
- store_ch_record(C#cr{is_overload_protection_active = NewActive}),
- Result.
->>>>>>> other
-
-<<<<<<< local
-increment_dropped_message_count(State) ->
- State#q{dropped_message_count = State#q.dropped_message_count + 1}.
-
-find_auto_ack_consumer(RoundRobin, RoundRobinNew) ->
- case queue:out(RoundRobin) of
- {{value, QEntry = {_, #consumer{ack_required = AckRequired}}},
- RoundRobinTail} ->
- case AckRequired of
- true -> find_auto_ack_consumer(
- RoundRobinTail,
- queue:in(QEntry, RoundRobinNew));
- false -> {QEntry, queue:join(RoundRobinNew, RoundRobinTail)}
- end;
- {empty, _} -> false
+
+ch_record_state_transition(OldCR, NewCR) ->
+ BlockedOld = is_ch_blocked(OldCR),
+ BlockedNew = is_ch_blocked(NewCR),
+ if BlockedOld andalso not(BlockedNew) -> unblock;
+ BlockedNew andalso not(BlockedOld) -> block;
+ true -> ok
end.
-=======
+
deliver_immediately(Message, Delivered,
->>>>>>> other
State = #q{q = #amqqueue{name = QName},
round_robin = RoundRobin,
next_msg_id = NextId}) ->
@@ -208,117 +165,60 @@ deliver_immediately(Message, Delivered,
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
RoundRobinTail} ->
- rabbit_channel:deliver(
- ChPid, ConsumerTag, AckRequired,
- {QName, self(), NextId, Delivered, Message}),
- C = #cr{unsent_message_count = Count,
+ C = #cr{limiter_pid = LimiterPid,
+ unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
-<<<<<<< local
case not(AckRequired) orelse rabbit_limiter:can_send(
-
-deliver_to_consumer(Message, Delivered, QName, MsgId,
- QEntry = {ChPid, #consumer{tag = ConsumerTag,
- ack_required = AckRequired}},
- RoundRobinTail) ->
- ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
- rabbit_channel:deliver(
- ChPid, ConsumerTag, AckRequired,
- {QName, self(), MsgId, Delivered, Message}),
- C = #cr{unsent_message_count = Count, unacked_messages = UAM} =
- ch_record(ChPid),
- NewUAM = case AckRequired of
- true -> dict:store(MsgId, Message, UAM);
- false -> UAM
- end,
- NewConsumers = case update_store_and_maybe_block_ch(
- C#cr{unsent_message_count = Count + 1,
- unacked_messages = NewUAM}) of
- ok -> queue:in(QEntry, RoundRobinTail);
- block_ch -> block_consumers(ChPid, RoundRobinTail)
- end,
- {AckRequired, NewConsumers}.
-
-deliver_immediately(Message, Delivered, true,
- State = #q{q = #amqqueue{name = QName},
- round_robin = RoundRobin,
- next_msg_id = NextId}) ->
- case queue:is_empty(RoundRobin) of
- true -> {not_offered, State};
- false -> case find_auto_ack_consumer(RoundRobin, queue:new()) of
- false ->
- {not_offered,
- increment_dropped_message_count(State)};
- {QEntry, RoundRobinTail} ->
- {AckRequired, NewRoundRobin} =
- deliver_to_consumer(
- Message, Delivered, QName, NextId,
- QEntry, RoundRobinTail),
- {offered, AckRequired,
- State#q{round_robin = NewRoundRobin,
- next_msg_id = NextId + 1}}
- end
- end;
-deliver_immediately(Message, Delivered, false,
-<<<<<<< local
- {{value, QEntry}, RoundRobinTail} ->
- {AckRequired, NewRoundRobin} =
- deliver_to_consumer(Message, Delivered, QName, NextId,
- QEntry, RoundRobinTail),
- {offered, AckRequired, State#q{round_robin = NewRoundRobin,
- next_msg_id = NextId + 1}};
-=======
->>>>>>> other
+ LimiterPid, self()) of
+ true ->
+ rabbit_channel:deliver(
+ ChPid, ConsumerTag, AckRequired,
+ {QName, self(), NextId, Delivered, Message}),
+ NewUAM = case AckRequired of
+ true -> dict:store(NextId, Message, UAM);
+ false -> UAM
+ end,
+ NewC = C#cr{unsent_message_count = Count + 1,
+ unacked_messages = NewUAM},
+ store_ch_record(NewC),
+ NewConsumers =
+ case ch_record_state_transition(C, NewC) of
+ ok -> queue:in(QEntry, RoundRobinTail);
+ block -> block_consumers(ChPid, RoundRobinTail)
+ end,
+ {offered, AckRequired, State#q{round_robin = NewConsumers,
+ next_msg_id = NextId + 1}};
false ->
store_ch_record(C#cr{is_limit_active = true}),
NewConsumers = block_consumers(ChPid, RoundRobinTail),
deliver_immediately(Message, Delivered,
State#q{round_robin = NewConsumers})
end;
-=======
- NewUAM = case AckRequired of
- true -> dict:store(NextId, Message, UAM);
- false -> UAM
- end,
- NewConsumers =
- case update_store_and_maybe_block_ch(
- C#cr{unsent_message_count = Count + 1,
- unacked_messages = NewUAM}) of
- ok -> queue:in(QEntry, RoundRobinTail);
- block_ch -> block_consumers(ChPid, RoundRobinTail)
- end,
- {offered, AckRequired, State#q{round_robin = NewConsumers,
- next_msg_id = NextId +1}};
->>>>>>> other
{empty, _} ->
- not_offered
+ {not_offered, State}
end.
-attempt_delivery(none, Message, State = #q{conserve_memory = Conserve}) ->
- case deliver_immediately(Message, false, Conserve, State) of
+attempt_delivery(none, Message, State) ->
+ case deliver_immediately(Message, false, State) of
{offered, false, State1} ->
{true, State1};
{offered, true, State1} ->
persist_message(none, qname(State), Message),
persist_delivery(qname(State), Message, false),
{true, State1};
- not_offered ->
- {false, State}
+ {not_offered, State1} ->
+ {false, State1}
end;
-attempt_delivery(Txn, Message, State = #q{conserve_memory = false}) ->
+attempt_delivery(Txn, Message, State) ->
persist_message(Txn, qname(State), Message),
record_pending_message(Txn, Message),
- {true, State};
-attempt_delivery(Txn, _Message, State = #q{conserve_memory = true}) ->
- mark_tx_failed(Txn, dropped_messages_to_conserve_memory),
- {false, increment_dropped_message_count(State)}.
+ {true, State}.
deliver_or_enqueue(Txn, Message, State) ->
case attempt_delivery(Txn, Message, State) of
{true, NewState} ->
{true, NewState};
- {false, NewState = #q{conserve_memory = true}} ->
- {false, NewState};
- {false, NewState = #q{conserve_memory = false}} ->
+ {false, NewState} ->
persist_message(Txn, qname(State), Message),
NewMB = queue:in({Message, false}, NewState#q.message_buffer),
{false, NewState#q{message_buffer = NewMB}}
@@ -345,16 +245,22 @@ block_consumer(ChPid, ConsumerTag, RoundRobin) ->
(CP /= ChPid) or (CT /= ConsumerTag)
end, queue:to_list(RoundRobin))).
-possibly_unblock(C = #cr{consumers = Consumers, ch_pid = ChPid},
- State = #q{round_robin = RoundRobin}) ->
- case update_store_and_maybe_block_ch(C) of
- ok ->
+possibly_unblock(State, ChPid, Update) ->
+ case lookup_ch(ChPid) of
+ not_found ->
State;
- unblock_ch ->
- run_poke_burst(State#q{round_robin =
- unblock_consumers(ChPid, Consumers, RoundRobin)})
+ C ->
+ NewC = Update(C),
+ store_ch_record(NewC),
+ case ch_record_state_transition(C, NewC) of
+ ok -> State;
+ unblock -> NewRR = unblock_consumers(ChPid,
+ NewC#cr.consumers,
+ State#q.round_robin),
+ run_poke_burst(State#q{round_robin = NewRR})
+ end
end.
-
+
check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) ->
{continue, State};
check_auto_delete(State = #q{has_had_consumers = false}) ->
@@ -409,7 +315,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
{stop, normal, NewState}
end
end.
-
+
cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) ->
none;
cancel_holder(_ChPid, _ConsumerTag, Holder) ->
@@ -435,15 +341,15 @@ run_poke_burst(State = #q{message_buffer = MessageBuffer}) ->
run_poke_burst(MessageBuffer, State) ->
case queue:out(MessageBuffer) of
{{value, {Message, Delivered}}, BufferTail} ->
- case deliver_immediately(Message, Delivered, false, State) of
+ case deliver_immediately(Message, Delivered, State) of
{offered, true, NewState} ->
persist_delivery(qname(State), Message, Delivered),
run_poke_burst(BufferTail, NewState);
{offered, false, NewState} ->
persist_auto_ack(qname(State), Message),
run_poke_burst(BufferTail, NewState);
- not_offered ->
- State#q{message_buffer = MessageBuffer}
+ {not_offered, NewState} ->
+ NewState#q{message_buffer = MessageBuffer}
end;
{empty, _} ->
State#q{message_buffer = MessageBuffer}
@@ -525,7 +431,6 @@ lookup_tx(Txn) ->
case get({txn, Txn}) of
undefined -> #tx{ch_pid = none,
is_persistent = false,
- fail_reason = none,
pending_messages = [],
pending_acks = []};
V -> V
@@ -551,14 +456,6 @@ is_tx_persistent(Txn) ->
#tx{is_persistent = Res} = lookup_tx(Txn),
Res.
-mark_tx_failed(Txn, Reason) ->
- Tx = lookup_tx(Txn),
- store_tx(Txn, Tx#tx{fail_reason = Reason}).
-
-tx_fail_reason(Txn) ->
- #tx{fail_reason = Res} = lookup_tx(Txn),
- Res.
-
record_pending_message(Txn, Message) ->
Tx = #tx{pending_messages = Pending} = lookup_tx(Txn),
store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}).
@@ -617,8 +514,8 @@ i(messages_uncommitted, _) ->
#tx{pending_messages = Pending} <- all_tx_record()]);
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
- messages_unacknowledged,
- messages_uncommitted]]);
+ messages_unacknowledged,
+ messages_uncommitted]]);
i(acks_uncommitted, _) ->
lists:sum([length(Pending) ||
#tx{pending_acks = Pending} <- all_tx_record()]);
@@ -635,8 +532,8 @@ i(Item, _) ->
%---------------------------------------------------------------------------
-handle_call({conserve_memory, Conserve}, _From, State) ->
- {reply, ok, conserve_memory(Conserve, State)};
+handle_call(info, _From, State) ->
+ reply(infos(?INFO_KEYS, State), State);
handle_call({info, Items}, _From, State) ->
try
@@ -667,30 +564,16 @@ handle_call({deliver, Txn, Message}, _From, State) ->
reply(Delivered, NewState);
handle_call({commit, Txn}, From, State) ->
-<<<<<<< local
- NewState =
- case tx_fail_reason(Txn) of
- none -> ok = commit_work(Txn, qname(State)),
- %% optimisation: we reply straight away so the
- %% sender can continue
- gen_server:reply(From, ok),
- process_pending(Txn, State);
- Reason -> ok = rollback_work(Txn, qname(State)),
- gen_server:reply(From, {error, Reason}),
- State
- end,
-=======
ok = commit_work(Txn, qname(State)),
%% optimisation: we reply straight away so the sender can continue
- gen_server:reply(From, ok),
+ gen_server2:reply(From, ok),
NewState = process_pending(Txn, State),
->>>>>>> other
erase_tx(Txn),
noreply(NewState);
handle_call({notify_down, ChPid}, From, State) ->
%% optimisation: we reply straight away so the sender can continue
- gen_server:reply(From, ok),
+ gen_server2:reply(From, ok),
handle_ch_down(ChPid, State);
handle_call({basic_get, ChPid, NoAck}, _From,
@@ -717,8 +600,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
reply(empty, State)
end;
-handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
- ExclusiveConsume, OkMsg},
+handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
+ ConsumerTag, ExclusiveConsume, OkMsg},
_From, State = #q{owner = Owner,
exclusive_consumer = ExistingHolder,
round_robin = RoundRobin}) ->
@@ -732,8 +615,13 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
ok ->
C = #cr{consumers = Consumers} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
- C1 = C#cr{consumers = [Consumer | Consumers]},
- store_ch_record(C1),
+ store_ch_record(C#cr{consumers = [Consumer | Consumers],
+ limiter_pid = LimiterPid}),
+ if Consumers == [] ->
+ ok = rabbit_limiter:register(LimiterPid, self());
+ true ->
+ ok
+ end,
State1 = State#q{has_had_consumers = true,
exclusive_consumer =
if
@@ -753,12 +641,16 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C = #cr{consumers = Consumers} ->
+ C = #cr{consumers = Consumers, limiter_pid = LimiterPid} ->
NewConsumers = lists:filter
(fun (#consumer{tag = CT}) -> CT /= ConsumerTag end,
Consumers),
- C1 = C#cr{consumers = NewConsumers},
- store_ch_record(C1),
+ store_ch_record(C#cr{consumers = NewConsumers}),
+ if NewConsumers == [] ->
+ ok = rabbit_limiter:unregister(LimiterPid, self());
+ true ->
+ ok
+ end,
ok = maybe_send_reply(ChPid, OkMsg),
case check_auto_delete(
State#q{exclusive_consumer = cancel_holder(ChPid,
@@ -819,9 +711,6 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
reply(locked, State)
end.
-handle_cast({conserve_memory, Conserve}, State) ->
- {noreply, conserve_memory(Conserve, State)};
-
handle_cast({deliver, Txn, Message}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
{_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State),
@@ -864,14 +753,33 @@ handle_cast({requeue, MsgIds, ChPid}, State) ->
[{Message, true} || Message <- Messages], State))
end;
+handle_cast({unblock, ChPid}, State) ->
+ noreply(
+ possibly_unblock(State, ChPid,
+ fun (C) -> C#cr{is_limit_active = false} end));
+
handle_cast({notify_sent, ChPid}, State) ->
- case lookup_ch(ChPid) of
- not_found -> noreply(State);
- T = #cr{unsent_message_count =Count} ->
- noreply(possibly_unblock(
- T#cr{unsent_message_count = Count - 1},
- State))
- end.
+ noreply(
+ possibly_unblock(State, ChPid,
+ fun (C = #cr{unsent_message_count = Count}) ->
+ C#cr{unsent_message_count = Count - 1}
+ end));
+
+handle_cast({limit, ChPid, LimiterPid}, State) ->
+ noreply(
+ possibly_unblock(
+ State, ChPid,
+ fun (C = #cr{consumers = Consumers,
+ limiter_pid = OldLimiterPid,
+ is_limit_active = Limited}) ->
+ if Consumers =/= [] andalso OldLimiterPid == undefined ->
+ ok = rabbit_limiter:register(LimiterPid, self());
+ true ->
+ ok
+ end,
+ NewLimited = Limited andalso LimiterPid =/= undefined,
+ C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
+ end)).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
@@ -892,7 +800,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_info(timeout, State) ->
%% TODO: Once we drop support for R11B-5, we can change this to
%% {noreply, State, hibernate};
- proc_lib:hibernate(gen_server, enter_loop, [?MODULE, [], State]);
+ proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]);
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
new file mode 100644
index 00000000..761b3863
--- /dev/null
+++ b/src/rabbit_basic.erl
@@ -0,0 +1,75 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_basic).
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+
+-export([publish/1, message/4, delivery/4]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(publish/1 :: (delivery()) ->
+ {ok, routing_result(), [pid()]} | not_found()).
+-spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()).
+-spec(message/4 :: (exchange_name(), routing_key(), binary(), binary()) ->
+ message()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+publish(Delivery = #delivery{
+ message = #basic_message{exchange_name = ExchangeName}}) ->
+ case rabbit_exchange:lookup(ExchangeName) of
+ {ok, X} ->
+ {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery),
+ {ok, RoutingRes, DeliveredQPids};
+ Other ->
+ Other
+ end.
+
+delivery(Mandatory, Immediate, Txn, Message) ->
+ #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
+ sender = self(), message = Message}.
+
+message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) ->
+ {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ Content = #content{class_id = ClassId,
+ properties = #'P_basic'{content_type = ContentTypeBin},
+ properties_bin = none,
+ payload_fragments_rev = [BodyBin]},
+ #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKeyBin,
+ content = Content,
+ persistent_key = none}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5fd9a512..b2716ec4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -33,23 +33,29 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/4, do/2, do/3, shutdown/1]).
+-behaviour(gen_server2).
+
+-export([start_link/5, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2]).
-%% callbacks
--export([init/2, handle_message/2]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
--record(ch, {state, proxy_pid, reader_pid, writer_pid,
+-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host,
most_recently_declared_queue, consumer_mapping}).
+-define(HIBERNATE_AFTER, 1000).
+
+-define(MAX_PERMISSION_CACHE_SIZE, 12).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_link/4 :: (pid(), pid(), username(), vhost()) -> pid()).
+-spec(start_link/5 ::
+ (channel_number(), pid(), pid(), username(), vhost()) -> pid()).
-spec(do/2 :: (pid(), amqp_method()) -> 'ok').
-spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
@@ -61,112 +67,126 @@
%%----------------------------------------------------------------------------
-start_link(ReaderPid, WriterPid, Username, VHost) ->
- buffering_proxy:start_link(?MODULE, [ReaderPid, WriterPid,
- Username, VHost]).
+start_link(Channel, ReaderPid, WriterPid, Username, VHost) ->
+ {ok, Pid} = gen_server2:start_link(
+ ?MODULE, [Channel, ReaderPid, WriterPid,
+ Username, VHost], []),
+ Pid.
do(Pid, Method) ->
do(Pid, Method, none).
do(Pid, Method, Content) ->
- Pid ! {method, Method, Content},
- ok.
+ gen_server2:cast(Pid, {method, Method, Content}).
shutdown(Pid) ->
- Pid ! terminate,
- ok.
+ gen_server2:cast(Pid, terminate).
send_command(Pid, Msg) ->
- Pid ! {command, Msg},
- ok.
+ gen_server2:cast(Pid, {command, Msg}).
deliver(Pid, ConsumerTag, AckRequired, Msg) ->
- Pid ! {deliver, ConsumerTag, AckRequired, Msg},
- ok.
+ gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
conserve_memory(Pid, Conserve) ->
- Pid ! {conserve_memory, Conserve},
- ok.
+ gen_server2:cast(Pid, {conserve_memory, Conserve}).
%%---------------------------------------------------------------------------
-init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
+init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
process_flag(trap_exit, true),
link(WriterPid),
- %% this is bypassing the proxy so alarms can "jump the queue" and
- %% be handled promptly
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
- #ch{state = starting,
- proxy_pid = ProxyPid,
- reader_pid = ReaderPid,
- writer_pid = WriterPid,
- transaction_id = none,
- tx_participants = sets:new(),
- next_tag = 1,
- uncommitted_ack_q = queue:new(),
- unacked_message_q = queue:new(),
- username = Username,
- virtual_host = VHost,
- most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new()}.
-
-handle_message({method, Method, Content}, State) ->
+ {ok, #ch{state = starting,
+ channel = Channel,
+ reader_pid = ReaderPid,
+ writer_pid = WriterPid,
+ limiter_pid = undefined,
+ transaction_id = none,
+ tx_participants = sets:new(),
+ next_tag = 1,
+ uncommitted_ack_q = queue:new(),
+ unacked_message_q = queue:new(),
+ username = Username,
+ virtual_host = VHost,
+ most_recently_declared_queue = <<>>,
+ consumer_mapping = dict:new()}}.
+
+handle_call(_Request, _From, State) ->
+ noreply(State).
+
+handle_cast({method, Method, Content}, State) ->
try handle_method(Method, Content, State) of
{reply, Reply, NewState} ->
ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
- NewState;
+ noreply(NewState);
{noreply, NewState} ->
- NewState;
+ noreply(NewState);
stop ->
- exit(normal)
+ {stop, normal, State#ch{state = terminating}}
catch
exit:{amqp, Error, Explanation, none} ->
- terminate({amqp, Error, Explanation,
- rabbit_misc:method_record_type(Method)},
- State);
+ ok = notify_queues(internal_rollback(State)),
+ Reason = {amqp, Error, Explanation,
+ rabbit_misc:method_record_type(Method)},
+ State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
+ {stop, normal, State#ch{state = terminating}};
exit:normal ->
- terminate(normal, State);
+ {stop, normal, State};
_:Reason ->
- terminate({Reason, erlang:get_stacktrace()}, State)
+ {stop, {Reason, erlang:get_stacktrace()}, State}
end;
-handle_message(terminate, State) ->
- terminate(normal, State);
+handle_cast(terminate, State) ->
+ {stop, normal, State};
-handle_message({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
+handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
- State;
+ noreply(State);
-handle_message({deliver, ConsumerTag, AckRequired, Msg},
- State = #ch{proxy_pid = ProxyPid,
- writer_pid = WriterPid,
- next_tag = DeliveryTag}) ->
+handle_cast({deliver, ConsumerTag, AckRequired, Msg},
+ State = #ch{writer_pid = WriterPid,
+ next_tag = DeliveryTag}) ->
State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State),
- ok = internal_deliver(WriterPid, ProxyPid,
- true, ConsumerTag, DeliveryTag, Msg),
- State1#ch{next_tag = DeliveryTag + 1};
+ ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
+ noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_message({conserve_memory, Conserve}, State) ->
+handle_cast({conserve_memory, Conserve}, State) ->
+ ok = clear_permission_cache(),
ok = rabbit_writer:send_command(
State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
- State;
+ noreply(State).
-handle_message({'EXIT', _Pid, Reason}, State) ->
- terminate(Reason, State);
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State};
-handle_message(Other, State) ->
- terminate({unexpected_channel_message, Other}, State).
+handle_info(timeout, State) ->
+ ok = clear_permission_cache(),
+ %% TODO: Once we drop support for R11B-5, we can change this to
+ %% {noreply, State, hibernate};
+ proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]).
-%%---------------------------------------------------------------------------
+terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid,
+ state = terminating}) ->
+ rabbit_writer:shutdown(WriterPid),
+ rabbit_limiter:shutdown(LimiterPid);
-terminate(Reason, State = #ch{writer_pid = WriterPid}) ->
+terminate(Reason, State = #ch{writer_pid = WriterPid,
+ limiter_pid = LimiterPid}) ->
Res = notify_queues(internal_rollback(State)),
case Reason of
normal -> ok = Res;
_ -> ok
end,
rabbit_writer:shutdown(WriterPid),
- exit(Reason).
+ rabbit_limiter:shutdown(LimiterPid).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%---------------------------------------------------------------------------
+
+noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -190,6 +210,35 @@ return_queue_declare_ok(State, NoWait, Q) ->
{reply, Reply, NewState}
end.
+check_resource_access(Username, Resource, Perm) ->
+ V = {Resource, Perm},
+ Cache = case get(permission_cache) of
+ undefined -> [];
+ Other -> Other
+ end,
+ CacheTail =
+ case lists:member(V, Cache) of
+ true -> lists:delete(V, Cache);
+ false -> ok = rabbit_access_control:check_resource_access(
+ Username, Resource, Perm),
+ lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1)
+ end,
+ put(permission_cache, [V | CacheTail]),
+ ok.
+
+clear_permission_cache() ->
+ erase(permission_cache),
+ ok.
+
+check_configure_permitted(Resource, #ch{ username = Username}) ->
+ check_resource_access(Username, Resource, #permission.configure).
+
+check_write_permitted(Resource, #ch{ username = Username}) ->
+ check_resource_access(Username, Resource, #permission.write).
+
+check_read_permitted(Resource, #ch{ username = Username}) ->
+ check_resource_access(Username, Resource, #permission.read).
+
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
not_allowed, "no previously declared queue", []);
@@ -248,7 +297,6 @@ handle_method(_Method, _, #ch{state = starting}) ->
handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
ok = notify_queues(internal_rollback(State)),
ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
- ok = rabbit_writer:shutdown(WriterPid),
stop;
handle_method(#'access.request'{},_, State) ->
@@ -260,6 +308,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
immediate = Immediate},
Content, State = #ch{ virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
@@ -273,7 +322,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
content = DecodedContent,
persistent_key = PersistentKey},
- rabbit_exchange:route(Exchange, RoutingKey), State)};
+ rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
@@ -286,9 +335,10 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
true -> ok
end,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
- Participants = ack(State#ch.proxy_pid, TxnKey, Acked),
+ Participants = ack(TxnKey, Acked),
{noreply, case TxnKey of
- none -> State#ch{unacked_message_q = Remaining};
+ none -> ok = notify_limiter(State#ch.limiter_pid, Acked),
+ State#ch{unacked_message_q = Remaining};
_ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q,
Acked),
add_tx_participants(
@@ -299,12 +349,13 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
- _, State = #ch{ proxy_pid = ProxyPid, writer_pid = WriterPid,
+ _, State = #ch{ writer_pid = WriterPid,
next_tag = DeliveryTag }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_or_die(
QueueName,
- fun (Q) -> rabbit_amqqueue:basic_get(Q, ProxyPid, NoAck) end) of
+ fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
Msg = {_QName, _QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
@@ -330,12 +381,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait},
- _, State = #ch{ proxy_pid = ProxyPid,
- reader_pid = ReaderPid,
+ _, State = #ch{ reader_pid = ReaderPid,
+ limiter_pid = LimiterPid,
consumer_mapping = ConsumerMapping }) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ check_read_permitted(QueueName, State),
ActualConsumerTag =
case ConsumerTag of
<<>> -> rabbit_guid:binstring_guid("amq.ctag");
@@ -349,7 +401,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
QueueName,
fun (Q) ->
rabbit_amqqueue:basic_consume(
- Q, NoAck, ReaderPid, ProxyPid,
+ Q, NoAck, ReaderPid, self(), LimiterPid,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag}))
@@ -380,8 +432,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
nowait = NoWait},
- _, State = #ch{ proxy_pid = ProxyPid,
- consumer_mapping = ConsumerMapping }) ->
+ _, State = #ch{consumer_mapping = ConsumerMapping }) ->
OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
@@ -402,7 +453,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
%% cancel_ok ourselves it might overtake a
%% message sent previously by the queue.
rabbit_amqqueue:basic_cancel(
- Q, ProxyPid, ConsumerTag,
+ Q, self(), ConsumerTag,
ok_msg(NoWait, #'basic.cancel_ok'{
consumer_tag = ConsumerTag}))
end) of
@@ -414,13 +465,34 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
end
end;
-handle_method(#'basic.qos'{}, _, State) ->
- %% FIXME: Need to implement QOS
- {reply, #'basic.qos_ok'{}, State};
+handle_method(#'basic.qos'{global = true}, _, _State) ->
+ rabbit_misc:protocol_error(not_implemented, "global=true", []);
+
+handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
+ rabbit_misc:protocol_error(not_implemented,
+ "prefetch_size!=0 (~w)", [Size]);
+
+handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
+ _, State = #ch{ limiter_pid = LimiterPid }) ->
+ NewLimiterPid = case {LimiterPid, PrefetchCount} of
+ {undefined, 0} ->
+ undefined;
+ {undefined, _} ->
+ LPid = rabbit_limiter:start_link(self()),
+ ok = limit_queues(LPid, State),
+ LPid;
+ {_, 0} ->
+ ok = rabbit_limiter:shutdown(LimiterPid),
+ ok = limit_queues(undefined, State),
+ undefined;
+ {_, _} ->
+ LimiterPid
+ end,
+ ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount),
+ {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}};
handle_method(#'basic.recover'{requeue = true},
_, State = #ch{ transaction_id = none,
- proxy_pid = ProxyPid,
unacked_message_q = UAMQ }) ->
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
@@ -429,14 +501,13 @@ handle_method(#'basic.recover'{requeue = true},
%% order. To keep it happy we reverse the id list
%% since we are given them in reverse order.
rabbit_amqqueue:requeue(
- QPid, lists:reverse(MsgIds), ProxyPid)
+ QPid, lists:reverse(MsgIds), self())
end, ok, UAMQ),
%% No answer required, apparently!
{noreply, State#ch{unacked_message_q = queue:new()}};
handle_method(#'basic.recover'{requeue = false},
_, State = #ch{ transaction_id = none,
- proxy_pid = ProxyPid,
writer_pid = WriterPid,
unacked_message_q = UAMQ }) ->
lists:foreach(
@@ -454,8 +525,7 @@ handle_method(#'basic.recover'{requeue = false},
%%
%% FIXME: should we allocate a fresh DeliveryTag?
ok = internal_deliver(
- WriterPid, ProxyPid,
- false, ConsumerTag, DeliveryTag,
+ WriterPid, false, ConsumerTag, DeliveryTag,
{QName, QPid, MsgId, true, Message})
end, queue:to_list(UAMQ)),
%% No answer required, apparently!
@@ -476,6 +546,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
_, State = #ch{ virtual_host = VHostPath }) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_configure_permitted(ExchangeName, State),
X = case rabbit_exchange:lookup(ExchangeName) of
{ok, FoundX} -> FoundX;
{error, not_found} ->
@@ -495,6 +566,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
nowait = NoWait},
_, State = #ch{ virtual_host = VHostPath }) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_configure_permitted(ExchangeName, State),
X = rabbit_exchange:lookup_or_die(ExchangeName),
ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)),
return_ok(State, NoWait, #'exchange.declare_ok'{});
@@ -504,6 +576,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
nowait = NoWait},
_, State = #ch { virtual_host = VHostPath }) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_configure_permitted(ExchangeName, State),
case rabbit_exchange:delete(ExchangeName, IfUnused) of
{error, not_found} ->
rabbit_misc:protocol_error(
@@ -554,9 +627,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
+ check_configure_permitted(QueueName, State),
Finish(rabbit_amqqueue:declare(QueueName,
Durable, AutoDelete, Args));
- Other -> Other
+ Other = #amqqueue{name = QueueName} ->
+ check_configure_permitted(QueueName, State),
+ Other
end,
return_queue_declare_ok(State, NoWait, Q);
@@ -565,6 +641,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
nowait = NoWait},
_, State = #ch{ virtual_host = VHostPath }) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
+ check_configure_permitted(QueueName, State),
Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end),
return_queue_declare_ok(State, NoWait, Q);
@@ -575,6 +652,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
},
_, State) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ check_configure_permitted(QueueName, State),
case rabbit_amqqueue:with_or_die(
QueueName,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
@@ -611,6 +689,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
_, State) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ check_read_permitted(QueueName, State),
{ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die(
QueueName,
fun (Q) -> rabbit_amqqueue:purge(Q) end),
@@ -660,9 +739,11 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
%% FIXME: don't allow binding to internal exchanges -
%% including the one named "" !
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ check_write_permitted(QueueName, State),
ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_read_permitted(ExchangeName, State),
case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
{error, exchange_not_found} ->
rabbit_misc:protocol_error(
@@ -748,10 +829,10 @@ add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) ->
State#ch{tx_participants = sets:union(Participants,
sets:from_list(MoreP))}.
-ack(ProxyPid, TxnKey, UAQ) ->
+ack(TxnKey, UAQ) ->
fold_per_queue(
fun (QPid, MsgIds, L) ->
- ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, ProxyPid),
+ ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()),
[QPid | L]
end, [], UAQ).
@@ -766,7 +847,9 @@ internal_commit(State = #ch{transaction_id = TxnKey,
tx_participants = Participants}) ->
case rabbit_amqqueue:commit_all(sets:to_list(Participants),
TxnKey) of
- ok -> new_tx(State);
+ ok -> ok = notify_limiter(State#ch.limiter_pid,
+ State#ch.uncommitted_ack_q),
+ new_tx(State);
{error, Errors} -> rabbit_misc:protocol_error(
internal_error, "commit failed: ~w", [Errors])
end.
@@ -803,19 +886,37 @@ fold_per_queue(F, Acc0, UAQ) ->
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
-notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) ->
- rabbit_amqqueue:notify_down_all(
- [QPid || QueueName <-
- sets:to_list(
- dict:fold(fun (_ConsumerTag, QueueName, S) ->
- sets:add_element(QueueName, S)
- end, sets:new(), Consumers)),
- case rabbit_amqqueue:lookup(QueueName) of
- {ok, Q} -> QPid = Q#amqqueue.pid, true;
- %% queue has been deleted in the meantime
- {error, not_found} -> QPid = none, false
- end],
- ProxyPid).
+notify_queues(#ch{consumer_mapping = Consumers}) ->
+ rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()).
+
+limit_queues(LPid, #ch{consumer_mapping = Consumers}) ->
+ rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid).
+
+consumer_queues(Consumers) ->
+ [QPid || QueueName <-
+ sets:to_list(
+ dict:fold(fun (_ConsumerTag, QueueName, S) ->
+ sets:add_element(QueueName, S)
+ end, sets:new(), Consumers)),
+ case rabbit_amqqueue:lookup(QueueName) of
+ {ok, Q} -> QPid = Q#amqqueue.pid, true;
+ %% queue has been deleted in the meantime
+ {error, not_found} -> QPid = none, false
+ end].
+
+%% tell the limiter about the number of acks that have been received
+%% for messages delivered to subscribed consumers, but not acks for
+%% messages sent in a response to a basic.get (identified by their
+%% 'none' consumer tag)
+notify_limiter(undefined, _Acked) ->
+ ok;
+notify_limiter(LimiterPid, Acked) ->
+ case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
+ ({_, _, _}, Acc) -> Acc + 1
+ end, 0, queue:to_list(Acked)) of
+ 0 -> ok;
+ Count -> rabbit_limiter:ack(LimiterPid, Count)
+ end.
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
@@ -823,7 +924,8 @@ is_message_persistent(#content{properties = #'P_basic'{
1 -> false;
2 -> true;
undefined -> false;
- Other -> rabbit_log:warning("Unknown delivery mode ~p - treating as 1, non-persistent~n",
+ Other -> rabbit_log:warning("Unknown delivery mode ~p - "
+ "treating as 1, non-persistent~n",
[Other]),
false
end.
@@ -833,7 +935,7 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
-internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag,
+internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
{_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
@@ -845,6 +947,6 @@ internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag,
routing_key = RoutingKey},
ok = case Notify of
true -> rabbit_writer:send_command_and_notify(
- WriterPid, QPid, ChPid, M, Content);
+ WriterPid, QPid, self(), M, Content);
false -> rabbit_writer:send_command(WriterPid, M, Content)
end.
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index dc5824f1..76016a8c 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -74,7 +74,11 @@ publish(_Other, _Format, _Data, _State) ->
ok.
publish1(RoutingKey, Format, Data, LogExch) ->
- {ok, _QueueNames} = rabbit_exchange:simple_publish(
- false, false, LogExch, RoutingKey, <<"text/plain">>,
- list_to_binary(io_lib:format(Format, Data))),
+ {ok, _RoutingRes, _DeliveredQPids} =
+ rabbit_basic:publish(
+ rabbit_basic:delivery(
+ false, false, none,
+ rabbit_basic:message(
+ LogExch, RoutingKey, <<"text/plain">>,
+ list_to_binary(io_lib:format(Format, Data))))),
ok.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index e0e3113f..3b6338c7 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -37,16 +37,11 @@
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
list/1, info/1, info/2, info_all/1, info_all/2,
simple_publish/6, simple_publish/3,
- route/2]).
+ route/3]).
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_exchange.erl
--export([delete_bindings_for_queue/1]).
--export([check_type/1, assert_type/2, topic_matches/2]).
-=======
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]).
->>>>>>> /tmp/rabbit_exchange.erl~other.4_e6ym
%% EXTENDED API
-export([list_exchange_bindings/1]).
@@ -84,7 +79,7 @@
(bool(), bool(), exchange_name(), routing_key(), binary(), binary()) ->
publish_res()).
-spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()).
--spec(route/2 :: (exchange(), routing_key()) -> [pid()]).
+-spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]).
-spec(add_binding/4 ::
(exchange_name(), queue_name(), routing_key(), amqp_table()) ->
bind_res() | {'error', 'durability_settings_incompatible'}).
@@ -96,6 +91,7 @@
-spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok').
-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok').
-spec(topic_matches/2 :: (binary(), binary()) -> bool()).
+-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()).
-spec(delete/2 :: (exchange_name(), bool()) ->
'ok' | not_found() | {'error', 'in_use'}).
-spec(list_queue_bindings/1 :: (queue_name()) ->
@@ -110,24 +106,6 @@
-define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
recover() ->
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_exchange.erl
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- mnesia:foldl(
- fun (Exchange, Acc) ->
- ok = mnesia:write(Exchange),
- Acc
- end, ok, durable_exchanges),
- mnesia:foldl(
- fun (Route, Acc) ->
- {_, ReverseRoute} = route_with_reverse(Route),
- ok = mnesia:write(Route),
- ok = mnesia:write(ReverseRoute),
- Acc
- end, ok, durable_routes),
- ok
- end).
-=======
ok = rabbit_misc:table_foreach(
fun(Exchange) -> ok = mnesia:write(rabbit_exchange,
Exchange, write)
@@ -139,7 +117,6 @@ recover() ->
ok = mnesia:write(rabbit_reverse_route,
ReverseRoute, write)
end, rabbit_durable_route).
->>>>>>> /tmp/rabbit_exchange.erl~other.4_e6ym
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange = #exchange{name = ExchangeName,
@@ -149,11 +126,11 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
arguments = Args},
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({exchange, ExchangeName}) of
- [] -> ok = mnesia:write(Exchange),
+ case mnesia:wread({rabbit_exchange, ExchangeName}) of
+ [] -> ok = mnesia:write(rabbit_exchange, Exchange, write),
if Durable ->
- ok = mnesia:write(
- durable_exchanges, Exchange, write);
+ ok = mnesia:write(rabbit_durable_exchange,
+ Exchange, write);
true -> ok
end,
Exchange;
@@ -167,6 +144,8 @@ check_type(<<"direct">>) ->
direct;
check_type(<<"topic">>) ->
topic;
+check_type(<<"headers">>) ->
+ headers;
check_type(T) ->
rabbit_misc:protocol_error(
command_invalid, "invalid exchange type '~s'", [T]).
@@ -180,7 +159,7 @@ assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) ->
[rabbit_misc:rs(Name), ActualType, RequiredType]).
lookup(Name) ->
- rabbit_misc:dirty_read({exchange, Name}).
+ rabbit_misc:dirty_read({rabbit_exchange, Name}).
lookup_or_die(Name) ->
case lookup(Name) of
@@ -192,6 +171,7 @@ lookup_or_die(Name) ->
list(VHostPath) ->
mnesia:dirty_match_object(
+ rabbit_exchange,
#exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
map(VHostPath, F) ->
@@ -233,64 +213,80 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin,
%% Usable by Erlang code that wants to publish messages.
simple_publish(Mandatory, Immediate,
Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey}) ->
+ routing_key = RoutingKey,
+ content = Content}) ->
case lookup(ExchangeName) of
{ok, Exchange} ->
- QPids = route(Exchange, RoutingKey),
+ QPids = route(Exchange, RoutingKey, Content),
rabbit_router:deliver(QPids, Mandatory, Immediate,
none, Message);
{error, Error} -> {error, Error}
end.
+sort_arguments(Arguments) ->
+ lists:keysort(1, Arguments).
+
%% return the list of qpids to which a message with a given routing
%% key, sent to a particular exchange, should be delivered.
%%
%% The function ensures that a qpid appears in the return list exactly
%% as many times as a message should be delivered to it. With the
%% current exchange types that is at most once.
-%%
+route(X = #exchange{type = topic}, RoutingKey, _Content) ->
+ match_bindings(X, fun (#binding{key = BindingKey}) ->
+ topic_matches(BindingKey, RoutingKey)
+ end);
+
+route(X = #exchange{type = headers}, _RoutingKey, Content) ->
+ Headers = case (Content#content.properties)#'P_basic'.headers of
+ undefined -> [];
+ H -> sort_arguments(H)
+ end,
+ match_bindings(X, fun (#binding{args = Spec}) ->
+ headers_match(Spec, Headers)
+ end);
+
+route(X = #exchange{type = fanout}, _RoutingKey, _Content) ->
+ match_routing_key(X, '_');
+
+route(X = #exchange{type = direct}, RoutingKey, _Content) ->
+ match_routing_key(X, RoutingKey).
+
%% TODO: Maybe this should be handled by a cursor instead.
-route(#exchange{name = Name, type = topic}, RoutingKey) ->
- Query = qlc:q([QName ||
- #route{binding = #binding{
- exchange_name = ExchangeName,
- queue_name = QName,
- key = BindingKey}} <- mnesia:table(route),
- ExchangeName == Name,
- %% TODO: This causes a full scan for each entry
- %% with the same exchange (see bug 19336)
- topic_matches(BindingKey, RoutingKey)]),
+%% TODO: This causes a full scan for each entry with the same exchange
+match_bindings(#exchange{name = Name}, Match) ->
+ Query = qlc:q([QName || #route{binding = Binding = #binding{
+ exchange_name = ExchangeName,
+ queue_name = QName}} <-
+ mnesia:table(rabbit_route),
+ ExchangeName == Name,
+ Match(Binding)]),
lookup_qpids(
try
mnesia:async_dirty(fun qlc:e/1, [Query])
catch exit:{aborted, {badarg, _}} ->
%% work around OTP-7025, which was fixed in R12B-1, by
%% falling back on a less efficient method
- [QName || #route{binding = #binding{queue_name = QName,
- key = BindingKey}} <-
+ [QName || #route{binding = Binding = #binding{
+ queue_name = QName}} <-
mnesia:dirty_match_object(
+ rabbit_route,
#route{binding = #binding{exchange_name = Name,
_ = '_'}}),
- topic_matches(BindingKey, RoutingKey)]
- end);
-
-route(X = #exchange{type = fanout}, _) ->
- route_internal(X, '_');
-
-route(X = #exchange{type = direct}, RoutingKey) ->
- route_internal(X, RoutingKey).
+ Match(Binding)]
+ end).
-route_internal(#exchange{name = Name}, RoutingKey) ->
+match_routing_key(#exchange{name = Name}, RoutingKey) ->
MatchHead = #route{binding = #binding{exchange_name = Name,
queue_name = '$1',
key = RoutingKey,
_ = '_'}},
- lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])).
+ lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
lookup_qpids(Queues) ->
sets:fold(
fun(Key, Acc) ->
- case mnesia:dirty_read({amqqueue, Key}) of
+ case mnesia:dirty_read({rabbit_queue, Key}) of
[#amqqueue{pid = QPid}] -> [QPid | Acc];
[] -> Acc
end
@@ -300,13 +296,6 @@ lookup_qpids(Queues) ->
%% refactored to its own module, especially seeing as unbind will have
%% to be implemented for 0.91 ?
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_exchange.erl
-delete_bindings_for_exchange(ExchangeName) ->
- indexed_delete(
- #route{binding = #binding{exchange_name = ExchangeName,
- _ = '_'}},
- fun delete_forward_routes/1, fun mnesia:delete_object/1).
-=======
delete_exchange_bindings(ExchangeName) ->
[begin
ok = mnesia:delete_object(rabbit_reverse_route,
@@ -318,7 +307,6 @@ delete_exchange_bindings(ExchangeName) ->
_ = '_'}},
write)],
ok.
->>>>>>> /tmp/rabbit_exchange.erl~other.4_e6ym
delete_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, fun delete_forward_routes/1).
@@ -328,14 +316,7 @@ delete_transient_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, FwdDeleteFun) ->
Exchanges = exchanges_for_queue(QueueName),
- indexed_delete(
- reverse_route(#route{binding = #binding{queue_name = QueueName,
- _ = '_'}}),
- fun mnesia:delete_object/1, fun delete_forward_routes/1),
[begin
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_exchange.erl
- [X] = mnesia:read({exchange, ExchangeName}),
-=======
ok = FwdDeleteFun(reverse_route(Route)),
ok = mnesia:delete_object(rabbit_reverse_route, Route, write)
end || Route <- mnesia:match_object(
@@ -346,21 +327,13 @@ delete_queue_bindings(QueueName, FwdDeleteFun) ->
write)],
[begin
[X] = mnesia:read({rabbit_exchange, ExchangeName}),
->>>>>>> /tmp/rabbit_exchange.erl~other.4_e6ym
ok = maybe_auto_delete(X)
end || ExchangeName <- Exchanges],
ok.
-indexed_delete(Match, ForwardsDeleteFun, ReverseDeleteFun) ->
- [begin
- ok = ReverseDeleteFun(reverse_route(Route)),
- ok = ForwardsDeleteFun(Route)
- end || Route <- mnesia:match_object(Match)],
- ok.
-
delete_forward_routes(Route) ->
- ok = mnesia:delete_object(Route),
- ok = mnesia:delete_object(durable_routes, Route, write).
+ ok = mnesia:delete_object(rabbit_route, Route, write),
+ ok = mnesia:delete_object(rabbit_durable_route, Route, write).
delete_transient_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write).
@@ -372,17 +345,18 @@ exchanges_for_queue(QueueName) ->
_ = '_'}}),
sets:to_list(
sets:from_list(
- mnesia:select(reverse_route, [{MatchHead, [], ['$1']}]))).
+ mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))).
has_bindings(ExchangeName) ->
MatchHead = #route{binding = #binding{exchange_name = ExchangeName,
_ = '_'}},
try
- continue(mnesia:select(route, [{MatchHead, [], ['$_']}], 1, read))
+ continue(mnesia:select(rabbit_route, [{MatchHead, [], ['$_']}],
+ 1, read))
catch exit:{aborted, {badarg, _}} ->
%% work around OTP-7025, which was fixed in R12B-1, by
%% falling back on a less efficient method
- case mnesia:match_object(MatchHead) of
+ case mnesia:match_object(rabbit_route, MatchHead, read) of
[] -> false;
[_|_] -> true
end
@@ -394,26 +368,13 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
call_with_exchange(Exchange, Fun) ->
rabbit_misc:execute_mnesia_transaction(
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_exchange.erl
- fun() -> case mnesia:read({exchange, Exchange}) of
- [] -> {error, exchange_not_found};
-=======
fun() -> case mnesia:read({rabbit_exchange, Exchange}) of
[] -> {error, not_found};
->>>>>>> /tmp/rabbit_exchange.erl~other.4_e6ym
[X] -> Fun(X)
end
end).
call_with_exchange_and_queue(Exchange, Queue, Fun) ->
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_exchange.erl
- call_with_exchange(
- Exchange,
- fun(X) -> case mnesia:read({amqqueue, Queue}) of
- [] -> {error, queue_not_found};
- [Q] -> Fun(X, Q)
- end
-=======
rabbit_misc:execute_mnesia_transaction(
fun() -> case {mnesia:read({rabbit_exchange, Exchange}),
mnesia:read({rabbit_queue, Queue})} of
@@ -422,7 +383,6 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
{[_], [ ]} -> {error, queue_not_found};
{[ ], [ ]} -> {error, exchange_and_queue_not_found}
end
->>>>>>> /tmp/rabbit_exchange.erl~other.4_e6ym
end).
add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
@@ -451,13 +411,15 @@ sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) ->
Binding = #binding{exchange_name = ExchangeName,
queue_name = QueueName,
key = RoutingKey,
- args = Arguments},
+ args = sort_arguments(Arguments)},
ok = case Durable of
- true -> Fun(durable_routes, #route{binding = Binding}, write);
+ true -> Fun(rabbit_durable_route,
+ #route{binding = Binding}, write);
false -> ok
end,
- [ok, ok] = [Fun(element(1, R), R, write) ||
- R <- tuple_to_list(route_with_reverse(Binding))],
+ {Route, ReverseRoute} = route_with_reverse(Binding),
+ ok = Fun(rabbit_route, Route, write),
+ ok = Fun(rabbit_reverse_route, ReverseRoute, write),
ok.
list_bindings(VHostPath) ->
@@ -468,6 +430,7 @@ list_bindings(VHostPath) ->
queue_name = QueueName,
args = Arguments}}
<- mnesia:dirty_match_object(
+ rabbit_route,
#route{binding = #binding{
exchange_name = rabbit_misc:r(VHostPath, exchange),
_ = '_'},
@@ -503,6 +466,67 @@ reverse_binding(#binding{exchange_name = Exchange,
key = Key,
args = Args}.
+default_headers_match_kind() -> all.
+
+parse_x_match(<<"all">>) -> all;
+parse_x_match(<<"any">>) -> any;
+parse_x_match(Other) ->
+ rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
+ [Other]),
+ default_headers_match_kind().
+
+%% Horrendous matching algorithm. Depends for its merge-like
+%% (linear-time) behaviour on the lists:keysort (sort_arguments) that
+%% route/3 and sync_binding/6 do.
+%%
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%%
+headers_match(Pattern, Data) ->
+ MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
+ {value, {_, longstr, MK}} -> parse_x_match(MK);
+ {value, {_, Type, MK}} ->
+ rabbit_log:warning("Invalid x-match field type ~p "
+ "(value ~p); expected longstr",
+ [Type, MK]),
+ default_headers_match_kind();
+ _ -> default_headers_match_kind()
+ end,
+ headers_match(Pattern, Data, true, false, MatchKind).
+
+headers_match([], _Data, AllMatch, _AnyMatch, all) ->
+ AllMatch;
+headers_match([], _Data, _AllMatch, AnyMatch, any) ->
+ AnyMatch;
+headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data,
+ AllMatch, AnyMatch, MatchKind) ->
+ headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind);
+headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) ->
+ headers_match([], [], false, AnyMatch, MatchKind);
+headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK > DK ->
+ headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind);
+headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _],
+ _AllMatch, AnyMatch, MatchKind) when PK < DK ->
+ headers_match(PRest, Data, false, AnyMatch, MatchKind);
+headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK == DK ->
+ {AllMatch1, AnyMatch1} =
+ if
+ %% It's not properly specified, but a "no value" in a
+ %% pattern field is supposed to mean simple presence of
+ %% the corresponding data field. I've interpreted that to
+ %% mean a type of "void" for the pattern field.
+ PT == void -> {AllMatch, true};
+ %% Similarly, it's not specified, but I assume that a
+ %% mismatched type causes a mismatched value.
+ PT =/= DT -> {false, AnyMatch};
+ PV == DV -> {AllMatch, true};
+ true -> {false, AnyMatch}
+ end,
+ headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
+
split_topic_key(Key) ->
{ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
KeySplit.
@@ -548,15 +572,9 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
end.
unconditional_delete(#exchange{name = ExchangeName}) ->
-<<<<<<< /tmp/rabbitmq-server/src/rabbit_exchange.erl
- ok = delete_bindings_for_exchange(ExchangeName),
- ok = mnesia:delete({durable_exchanges, ExchangeName}),
- ok = mnesia:delete({exchange, ExchangeName}).
-=======
ok = delete_exchange_bindings(ExchangeName),
ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
ok = mnesia:delete({rabbit_exchange, ExchangeName}).
->>>>>>> /tmp/rabbit_exchange.erl~other.4_e6ym
%%----------------------------------------------------------------------------
%% EXTENDED API
@@ -572,7 +590,7 @@ list_exchange_bindings(ExchangeName) ->
#route{binding = #binding{queue_name = QueueName,
key = RoutingKey,
args = Arguments}}
- <- mnesia:dirty_match_object(Route)].
+ <- mnesia:dirty_match_object(rabbit_route, Route)].
% Refactoring is left as an exercise for the reader
list_queue_bindings(QueueName) ->
@@ -582,4 +600,4 @@ list_queue_bindings(QueueName) ->
#route{binding = #binding{exchange_name = ExchangeName,
key = RoutingKey,
args = Arguments}}
- <- mnesia:dirty_match_object(Route)].
+ <- mnesia:dirty_match_object(rabbit_route, Route)].
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 3f9b6ebb..9f3dcbd0 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -36,7 +36,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
-export([start_link/1, shutdown/1]).
--export([limit/2, can_send/2, ack/2, register/2, unregister/2]).
+-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
%%----------------------------------------------------------------------------
@@ -47,7 +47,7 @@
-spec(start_link/1 :: (pid()) -> pid()).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
--spec(can_send/2 :: (maybe_pid(), pid()) -> bool()).
+-spec(can_send/3 :: (maybe_pid(), pid(), bool()) -> bool()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
@@ -85,12 +85,13 @@ limit(LimiterPid, PrefetchCount) ->
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
-can_send(undefined, _QPid) ->
+can_send(undefined, _QPid, _AckRequired) ->
true;
-can_send(LimiterPid, QPid) ->
+can_send(LimiterPid, QPid, AckRequired) ->
rabbit_misc:with_exit_handler(
fun () -> true end,
- fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end).
+ fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired},
+ infinity) end).
%% Let the limiter know that the channel has received some acks from a
%% consumer
@@ -110,10 +111,13 @@ unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid})
init([ChPid]) ->
{ok, #lim{ch_pid = ChPid} }.
-handle_call({can_send, QPid}, _From, State = #lim{volume = Volume}) ->
+handle_call({can_send, QPid, AckRequired}, _From,
+ State = #lim{volume = Volume}) ->
case limit_reached(State) of
true -> {reply, false, limit_queue(QPid, State)};
- false -> {reply, true, State#lim{volume = Volume + 1}}
+ false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
+ true -> Volume
+ end}}
end.
handle_cast(shutdown, State) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 9fc4ff71..4da247a4 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -36,9 +36,10 @@
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
-export([die/1, frame_error/2, protocol_error/3, protocol_error/4]).
+-export([not_found/1]).
-export([get_config/1, get_config/2, set_config/2]).
-export([dirty_read/1]).
--export([r/3, r/2, rs/1]).
+-export([r/3, r/2, r_arg/4, rs/1]).
-export([enable_cover/0, report_cover/0]).
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
@@ -71,16 +72,19 @@
(atom() | amqp_error(), string(), [any()]) -> no_return()).
-spec(protocol_error/4 ::
(atom() | amqp_error(), string(), [any()], atom()) -> no_return()).
+-spec(not_found/1 :: (r(atom())) -> no_return()).
-spec(get_config/1 :: (atom()) -> {'ok', any()} | not_found()).
-spec(get_config/2 :: (atom(), A) -> A).
-spec(set_config/2 :: (atom(), any()) -> 'ok').
-spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()).
--spec(r/3 :: (vhost() | r(atom()), K, resource_name()) -> r(K)
- when is_subtype(K, atom())).
+-spec(r/3 :: (vhost() | r(atom()), K, resource_name()) ->
+ r(K) when is_subtype(K, atom())).
-spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(),
kind :: K,
name :: '_'}
when is_subtype(K, atom())).
+-spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) ->
+ undefined | r(K) when is_subtype(K, atom())).
-spec(rs/1 :: (r(atom())) -> string()).
-spec(enable_cover/0 :: () -> 'ok' | {'error', any()}).
-spec(report_cover/0 :: () -> 'ok').
@@ -142,6 +146,8 @@ protocol_error(Error, Explanation, Params, Method) ->
CompleteExplanation = lists:flatten(io_lib:format(Explanation, Params)),
exit({amqp, Error, CompleteExplanation, Method}).
+not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]).
+
get_config(Key) ->
case dirty_read({rabbit_config, Key}) of
{ok, {rabbit_config, Key, V}} -> {ok, V};
@@ -172,6 +178,14 @@ r(VHostPath, Kind, Name) when is_binary(Name) andalso is_binary(VHostPath) ->
r(VHostPath, Kind) when is_binary(VHostPath) ->
#resource{virtual_host = VHostPath, kind = Kind, name = '_'}.
+r_arg(#resource{virtual_host = VHostPath}, Kind, Table, Key) ->
+ r_arg(VHostPath, Kind, Table, Key);
+r_arg(VHostPath, Kind, Table, Key) ->
+ case lists:keysearch(Key, 1, Table) of
+ {value, {_, longstr, NameBin}} -> r(VHostPath, Kind, NameBin);
+ false -> undefined
+ end.
+
rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) ->
lists:flatten(io_lib:format("~s '~s' in vhost '~s'",
[Kind, Name, VHostPath])).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index d19c37cb..0c573073 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -31,7 +31,7 @@
-module(rabbit_mnesia).
--export([ensure_mnesia_dir/0, status/0, init/0, is_db_empty/0,
+-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
cluster/1, reset/0, force_reset/0]).
-export([table_names/0]).
@@ -47,6 +47,7 @@
-ifdef(use_specs).
-spec(status/0 :: () -> [{'nodes' | 'running_nodes', [erlang_node()]}]).
+-spec(dir/0 :: () -> string()).
-spec(ensure_mnesia_dir/0 :: () -> 'ok').
-spec(init/0 :: () -> 'ok').
-spec(is_db_empty/0 :: () -> bool()).
@@ -131,8 +132,10 @@ table_definitions() ->
table_names() ->
[Tab || {Tab, _} <- table_definitions()].
+dir() -> mnesia:system_info(directory).
+
ensure_mnesia_dir() ->
- MnesiaDir = mnesia:system_info(directory) ++ "/",
+ MnesiaDir = dir() ++ "/",
case filelib:ensure_dir(MnesiaDir) of
{error, Reason} ->
throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}});
@@ -168,7 +171,7 @@ check_schema_integrity() ->
%% it doesn't.
cluster_nodes_config_filename() ->
- mnesia:system_info(directory) ++ "/cluster_nodes.config".
+ dir() ++ "/cluster_nodes.config".
create_cluster_nodes_config(ClusterNodes) ->
FileName = cluster_nodes_config_filename(),
@@ -284,7 +287,7 @@ create_schema() ->
move_db() ->
mnesia:stop(),
- MnesiaDir = filename:dirname(mnesia:system_info(directory) ++ "/"),
+ MnesiaDir = filename:dirname(dir() ++ "/"),
{{Year, Month, Day}, {Hour, Minute, Second}} = erlang:universaltime(),
BackupDir = lists:flatten(
io_lib:format("~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w",
@@ -401,7 +404,7 @@ reset(Force) ->
ok = delete_cluster_nodes_config(),
%% remove persistet messages and any other garbage we find
lists:foreach(fun file:delete/1,
- filelib:wildcard(mnesia:system_info(directory) ++ "/*")),
+ filelib:wildcard(dir() ++ "/*")),
ok.
leave_cluster([], _) -> ok;
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index f4fa4599..d0d60ddf 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -259,7 +259,7 @@ log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs},
pending_logs = [Message | Logs]}.
base_filename() ->
- mnesia:system_info(directory) ++ "/rabbit_persister.LOG".
+ rabbit_mnesia:dir() ++ "/rabbit_persister.LOG".
take_snapshot(LogHandle, OldFileName, Snapshot) ->
ok = disk_log:sync(LogHandle),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 988eb651..985ca3e2 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -610,9 +610,9 @@ handle_method0(#'connection.open'{virtual_host = VHostPath,
insist = Insist},
State = #v1{connection_state = opening,
connection = Connection = #connection{
- user = User},
+ user = #user{username = Username}},
sock = Sock}) ->
- ok = rabbit_access_control:check_vhost_access(User, VHostPath),
+ ok = rabbit_access_control:check_vhost_access(Username, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
KnownHosts = format_listeners(rabbit_networking:active_listeners()),
Redirects = compute_redirects(Insist),
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 624822d9..0b06a063 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -32,7 +32,7 @@
-module(rabbit_router).
-include("rabbit.hrl").
--behaviour(gen_server).
+-behaviour(gen_server2).
-export([start_link/0,
deliver/5]).
@@ -58,7 +58,7 @@
%%----------------------------------------------------------------------------
start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
-ifdef(BUG19758).
@@ -100,7 +100,7 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false,
%% than the non-immediate case below.
{ok, lists:flatmap(
fun ({Node, QPids}) ->
- gen_server:cast(
+ gen_server2:cast(
{?SERVER, Node},
{deliver, QPids, Mandatory, Immediate, Txn, Message}),
QPids
@@ -110,7 +110,7 @@ deliver_per_node(NodeQPids, Mandatory, Immediate,
Txn, Message) ->
R = rabbit_misc:upmap(
fun ({Node, QPids}) ->
- try gen_server:call(
+ try gen_server2:call(
{?SERVER, Node},
{deliver, QPids, Mandatory, Immediate, Txn, Message},
infinity)
@@ -144,7 +144,7 @@ handle_call({deliver, QPids, Mandatory, Immediate, Txn, Message},
spawn(
fun () ->
R = run_bindings(QPids, Mandatory, Immediate, Txn, Message),
- gen_server:reply(From, R)
+ gen_server2:reply(From, R)
end),
{noreply, State}.