diff options
25 files changed, 833 insertions, 682 deletions
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index c938548f..d0a27a36 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -196,7 +196,7 @@ name URL-encoded name of the exchange type - exchange type (B<direct>, B<topic> or B<fanout>) + exchange type (B<direct>, B<topic>, B<fanout>, or B<headers>) durable whether the exchange survives server restarts diff --git a/include/rabbit.hrl b/include/rabbit.hrl index d07aeaf8..a026602a 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -62,6 +62,8 @@ -record(basic_message, {exchange_name, routing_key, content, persistent_key}). +-record(delivery, {mandatory, immediate, txn, sender, message}). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -127,6 +129,12 @@ content :: content(), persistent_key :: maybe(pkey())}). -type(message() :: basic_message()). +-type(delivery() :: + #delivery{mandatory :: bool(), + immediate :: bool(), + txn :: maybe(txn()), + sender :: pid(), + message :: message()}). %% this really should be an abstract type -type(msg_id() :: non_neg_integer()). -type(msg() :: {queue_name(), pid(), msg_id(), bool(), message()}). @@ -136,6 +144,7 @@ host :: string() | atom(), port :: non_neg_integer()}). -type(not_found() :: {'error', 'not_found'}). +-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). -endif. diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index 9fe91b98..c74d4533 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -13,12 +13,10 @@ endif ifeq "x$(RPM_OS)" "xsuse" REQUIRES=/sbin/chkconfig /sbin/service -OS_DEFINES=--define '_initrddir /etc/init.d' -RELEASE_OS=.suse +OS_DEFINES=--define '_initrddir /etc/init.d' --define 'dist .suse' else REQUIRES=chkconfig initscripts OS_DEFINES=--define '_initrddir /etc/rc.d/init.d' -RELEASE_OS= endif rpms: clean server @@ -27,7 +25,7 @@ prepare: mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp cp $(TOP_DIR)/$(TARBALL) SOURCES cp rabbitmq-server.spec SPECS - sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|;s|%%RELEASE_OS%%|$(RELEASE_OS)|' \ + sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|' \ SPECS/rabbitmq-server.spec cp init.d SOURCES/rabbitmq-server.init diff --git a/packaging/RPMS/Fedora/init.d b/packaging/RPMS/Fedora/init.d index a9155f3b..77a6a89a 100644 --- a/packaging/RPMS/Fedora/init.d +++ b/packaging/RPMS/Fedora/init.d @@ -8,6 +8,8 @@ ### BEGIN INIT INFO # Provides: rabbitmq-server +# Default-Start: +# Default-Stop: # Required-Start: $remote_fs $network # Required-Stop: $remote_fs $network # Description: RabbitMQ broker diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 6bf3b841..875381e8 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -2,7 +2,7 @@ Name: rabbitmq-server Version: %%VERSION%% -Release: 1%%RELEASE_OS%% +Release: 1%{?dist} License: MPLv1.1 Group: Development/Libraries Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.gz @@ -34,7 +34,11 @@ scalable implementation of an AMQP broker. %build cp %{S:2} %{_rabbit_wrapper} sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_wrapper} -make %{?_smp_mflags} + +# The rabbitmq build needs escript, which is missing from /usr/bin in +# some versions of the erlang RPM. See +# <https://bugzilla.redhat.com/show_bug.cgi?id=481302> +PATH=%{_libdir}/erlang/bin:$PATH make %{?_smp_mflags} %install rm -rf %{buildroot} @@ -107,12 +111,15 @@ fi %{_rabbit_libdir} %{_initrddir}/rabbitmq-server %config(noreplace) %{_sysconfdir}/logrotate.d/rabbitmq-server -%doc LICENSE LICENSE-MPL-RabbitMQ INSTALL +%doc LICENSE LICENSE-MPL-RabbitMQ %clean rm -rf %{buildroot} %changelog +* Tue May 19 2009 Matthias Radestock <matthias@lshift.net> 1.5.5-1 +- Maintenance release for the 1.5.x series + * Mon Apr 6 2009 Matthias Radestock <matthias@lshift.net> 1.5.4-1 - Maintenance release for the 1.5.x series diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index d1ccd3a0..7c5673f7 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (1.5.5-1) hardy; urgency=low + + * New Upstream Release + + -- Matthias Radestock <matthias@lshift.net> Tue, 19 May 2009 09:57:54 +0100 + rabbitmq-server (1.5.4-1) hardy; urgency=low * New Upstream Release diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control index b2b3ab02..21636072 100644 --- a/packaging/debs/Debian/debian/control +++ b/packaging/debs/Debian/debian/control @@ -7,7 +7,7 @@ Standards-Version: 3.8.0 Package: rabbitmq-server Architecture: all -Depends: erlang-nox, adduser, logrotate, ${misc:Depends} +Depends: erlang-nox, erlang-os-mon | erlang-nox (<< 1:13.b-dfsg1-1), adduser, logrotate, ${misc:Depends} Description: An AMQP server written in Erlang RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/net/rabbitmq-server/Portfile index d9d16dbb..7fb31ad6 100644 --- a/packaging/macports/net/rabbitmq-server/Portfile +++ b/packaging/macports/net/rabbitmq-server/Portfile @@ -1,15 +1,15 @@ # -*- coding: utf-8; mode: tcl; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- vim:fenc=utf-8:filetype=tcl:et:sw=4:ts=4:sts=4 # $Id$ -PortSystem 1.0 - -name rabbitmq-server -version 1.5.3 -categories net -maintainers tonyg@rabbitmq.com -platforms darwin -description The RabbitMQ AMQP Server -long_description \ +PortSystem 1.0 +name rabbitmq-server +version 1.5.3 +revision 0 +categories net +maintainers tonyg@rabbitmq.com +platforms darwin +description The RabbitMQ AMQP Server +long_description \ RabbitMQ is an implementation of AMQP, the emerging standard for \ high performance enterprise messaging. The RabbitMQ server is a \ robust and scalable implementation of an AMQP broker. @@ -32,6 +32,8 @@ set serverhome ${prefix}/var/lib/rabbitmq set logdir ${prefix}/var/log/rabbitmq set mnesiadbdir ${prefix}/var/lib/rabbitmq/mnesia set plistloc ${prefix}/etc/LaunchDaemons/org.macports.rabbitmq-server +set sbindir ${destroot}${prefix}/lib/rabbitmq/bin +set wrappersbin ${destroot}${prefix}/sbin use_configure no @@ -41,7 +43,7 @@ build.args PYTHON=${prefix}/bin/python2.5 destroot.destdir \ TARGET_DIR=${destroot}${prefix}/lib/erlang/lib/rabbitmq_server-${version} \ - SBIN_DIR=${destroot}${prefix}/sbin \ + SBIN_DIR=${sbindir} \ MAN_DIR=${destroot}${prefix}/share/man destroot.keepdirs \ @@ -59,32 +61,36 @@ post-destroot { xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir} reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \ - ${destroot}${prefix}/sbin/rabbitmq-multi \ - ${destroot}${prefix}/sbin/rabbitmq-server \ - ${destroot}${prefix}/sbin/rabbitmqctl + ${sbindir}/rabbitmq-multi \ + ${sbindir}/rabbitmq-server \ + ${sbindir}/rabbitmqctl reinplace -E "s:(RABBITMQ_CLUSTER_CONFIG_FILE)=/:\\1=${prefix}/:" \ - ${destroot}${prefix}/sbin/rabbitmq-multi \ - ${destroot}${prefix}/sbin/rabbitmq-server \ - ${destroot}${prefix}/sbin/rabbitmqctl + ${sbindir}/rabbitmq-multi \ + ${sbindir}/rabbitmq-server \ + ${sbindir}/rabbitmqctl reinplace -E "s:(RABBITMQ_LOG_BASE)=/:\\1=${prefix}/:" \ - ${destroot}${prefix}/sbin/rabbitmq-multi \ - ${destroot}${prefix}/sbin/rabbitmq-server \ - ${destroot}${prefix}/sbin/rabbitmqctl + ${sbindir}/rabbitmq-multi \ + ${sbindir}/rabbitmq-server \ + ${sbindir}/rabbitmqctl reinplace -E "s:(RABBITMQ_MNESIA_BASE)=/:\\1=${prefix}/:" \ - ${destroot}${prefix}/sbin/rabbitmq-multi \ - ${destroot}${prefix}/sbin/rabbitmq-server \ - ${destroot}${prefix}/sbin/rabbitmqctl + ${sbindir}/rabbitmq-multi \ + ${sbindir}/rabbitmq-server \ + ${sbindir}/rabbitmqctl reinplace -E "s:(RABBITMQ_PIDS_FILE)=/:\\1=${prefix}/:" \ - ${destroot}${prefix}/sbin/rabbitmq-multi \ - ${destroot}${prefix}/sbin/rabbitmq-server \ - ${destroot}${prefix}/sbin/rabbitmqctl + ${sbindir}/rabbitmq-multi \ + ${sbindir}/rabbitmq-server \ + ${sbindir}/rabbitmqctl - file rename ${destroot}${prefix}/sbin/rabbitmqctl ${destroot}${prefix}/sbin/rabbitmqctl_real - xinstall -m 555 ${filespath}/rabbitmqctl_wrapper ${destroot}${prefix}/sbin - file rename ${destroot}${prefix}/sbin/rabbitmqctl_wrapper ${destroot}${prefix}/sbin/rabbitmqctl + xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \ + ${wrappersbin}/rabbitmq-multi + + reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \ + ${wrappersbin}/rabbitmq-multi + reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \ + ${wrappersbin}/rabbitmq-multi + file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server + file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl - reinplace -E "s:@PREFIX@:${prefix}:" \ - ${destroot}${prefix}/sbin/rabbitmqctl } pre-install { diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper new file mode 100644 index 00000000..b806049c --- /dev/null +++ b/packaging/macports/net/rabbitmq-server/files/rabbitmq-script-wrapper @@ -0,0 +1,23 @@ +#!/bin/bash +# Escape spaces and quotes, because shell is revolting. +for arg in "$@" ; do + # Escape quotes in parameters, so that they're passed through cleanly. + arg=$(sed -e 's/"/\\"/' <<-END + $arg + END + ) + CMDLINE="${CMDLINE} \"${arg}\"" +done + +cd /var/lib/rabbitmq + +SCRIPT=`basename $0` + +if [ `id -u` = 0 ] ; then + sudo -u rabbitmq -H /usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE} +else + /usr/lib/rabbitmq/bin/${SCRIPT} + echo -e "\nOnly root should run ${SCRIPT}\n" + exit 1 +fi + diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper deleted file mode 100644 index 1996811e..00000000 --- a/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/bash -exec sudo -H -u rabbitmq "@PREFIX@/sbin/rabbitmqctl_real" "$@" diff --git a/packaging/windows/rabbitmq-service.pod b/packaging/windows/rabbitmq-service.pod index 92648076..7c4d3ef2 100644 --- a/packaging/windows/rabbitmq-service.pod +++ b/packaging/windows/rabbitmq-service.pod @@ -92,10 +92,8 @@ Defaults to 5672. =head2 ERLANG_SERVICE_MANAGER_PATH -Defaults to F<C:\Program Files\erl5.5.5\erts-5.5.5\bin> -(or F<C:\Program Files (x86)\erl5.5.5\erts-5.5.5\bin> for 64-bit -environments). This is the installation location of the Erlang service -manager. +Defaults to F<C:\Program Files\erl5.5.5\erts-5.5.5\bin>. This is +the installation location of the Erlang service manager. =head2 CLUSTER_CONFIG_FILE diff --git a/src/rabbit.erl b/src/rabbit.erl index 97bbdd99..c8c814b6 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -238,10 +238,14 @@ print_banner() -> [Product, Version, ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), - io:format("Logging to ~p~nSASL logging to ~p~n~n", - [log_location(kernel), log_location(sasl)]). - - + Settings = [{"node", node()}, + {"log", log_location(kernel)}, + {"sasl log", log_location(sasl)}, + {"database dir", rabbit_mnesia:dir()}], + DescrLen = lists:max([length(K) || {K, _V} <- Settings]), + Format = "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", + lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings), + io:nl(). start_child(Mod) -> {ok,_} = supervisor:start_child(rabbit_sup, diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 4a0ff79e..54348d9a 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -34,11 +34,12 @@ -include("rabbit.hrl"). -export([check_login/2, user_pass_login/2, - check_vhost_access/2]). + check_vhost_access/2, check_resource_access/3]). -export([add_user/2, delete_user/1, change_password/2, list_users/0, lookup_user/1]). --export([add_vhost/1, delete_vhost/1, list_vhosts/0, list_vhost_users/1]). --export([list_user_vhosts/1, map_user_vhost/2, unmap_user_vhost/2]). +-export([add_vhost/1, delete_vhost/1, list_vhosts/0]). +-export([set_permissions/5, clear_permissions/2, + list_vhost_permissions/1, list_user_permissions/1]). %%---------------------------------------------------------------------------- @@ -47,6 +48,8 @@ -spec(check_login/2 :: (binary(), binary()) -> user()). -spec(user_pass_login/2 :: (username(), password()) -> user()). -spec(check_vhost_access/2 :: (user(), vhost()) -> 'ok'). +-spec(check_resource_access/3 :: + (username(), r(atom()), non_neg_integer()) -> 'ok'). -spec(add_user/2 :: (username(), password()) -> 'ok'). -spec(delete_user/1 :: (username()) -> 'ok'). -spec(change_password/2 :: (username(), password()) -> 'ok'). @@ -55,10 +58,13 @@ -spec(add_vhost/1 :: (vhost()) -> 'ok'). -spec(delete_vhost/1 :: (vhost()) -> 'ok'). -spec(list_vhosts/0 :: () -> [vhost()]). --spec(list_vhost_users/1 :: (vhost()) -> [username()]). --spec(list_user_vhosts/1 :: (username()) -> [vhost()]). --spec(map_user_vhost/2 :: (username(), vhost()) -> 'ok'). --spec(unmap_user_vhost/2 :: (username(), vhost()) -> 'ok'). +-spec(set_permissions/5 :: + (username(), vhost(), regexp(), regexp(), regexp()) -> 'ok'). +-spec(clear_permissions/2 :: (username(), vhost()) -> 'ok'). +-spec(list_vhost_permissions/1 :: + (vhost()) -> [{username(), regexp(), regexp(), regexp()}]). +-spec(list_user_permissions/1 :: + (username()) -> [{vhost(), regexp(), regexp(), regexp()}]). -endif. @@ -112,9 +118,9 @@ internal_lookup_vhost_access(Username, VHostPath) -> %% TODO: use dirty ops instead rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:match_object( - #user_vhost{username = Username, - virtual_host = VHostPath}) of + case mnesia:read({rabbit_user_permission, + #user_vhost{username = Username, + virtual_host = VHostPath}}) of [] -> not_found; [R] -> {ok, R} end @@ -131,13 +137,47 @@ check_vhost_access(#user{username = Username}, VHostPath) -> [VHostPath, Username]) end. +check_resource_access(Username, + R = #resource{kind = exchange, name = <<"">>}, + Permission) -> + check_resource_access(Username, + R#resource{name = <<"amq.default">>}, + Permission); +check_resource_access(_Username, + #resource{name = <<"amq.gen",_/binary>>}, + _Permission) -> + ok; +check_resource_access(Username, + R = #resource{virtual_host = VHostPath, name = Name}, + Permission) -> + Res = case mnesia:dirty_read({rabbit_user_permission, + #user_vhost{username = Username, + virtual_host = VHostPath}}) of + [] -> + false; + [#user_permission{permission = P}] -> + case regexp:match( + binary_to_list(Name), + binary_to_list(element(Permission, P))) of + {match, _, _} -> true; + nomatch -> false + end + end, + if Res -> ok; + true -> rabbit_misc:protocol_error( + access_refused, "access to ~s refused for user '~s'", + [rabbit_misc:rs(R), Username]) + end. + add_user(Username, Password) -> R = rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:read({user, Username}) of + case mnesia:wread({rabbit_user, Username}) of [] -> - ok = mnesia:write(#user{username = Username, - password = Password}); + ok = mnesia:write(rabbit_user, + #user{username = Username, + password = Password}, + write); _ -> mnesia:abort({user_already_exists, Username}) end @@ -150,10 +190,6 @@ delete_user(Username) -> rabbit_misc:with_user( Username, fun () -> -<<<<<<< /tmp/rabbitmq-server/src/rabbit_access_control.erl - ok = mnesia:delete({user, Username}), - ok = mnesia:delete({user_vhost, Username}) -======= ok = mnesia:delete({rabbit_user, Username}), [ok = mnesia:delete_object( rabbit_user_permission, R, write) || @@ -165,7 +201,6 @@ delete_user(Username) -> permission = '_'}, write)], ok ->>>>>>> /tmp/rabbit_access_control.erl~other.wd6AMx end)), rabbit_log:info("Deleted user ~p~n", [Username]), R. @@ -175,24 +210,28 @@ change_password(Username, Password) -> rabbit_misc:with_user( Username, fun () -> - ok = mnesia:write(#user{username = Username, - password = Password}) + ok = mnesia:write(rabbit_user, + #user{username = Username, + password = Password}, + write) end)), rabbit_log:info("Changed password for user ~p~n", [Username]), R. list_users() -> - mnesia:dirty_all_keys(user). + mnesia:dirty_all_keys(rabbit_user). lookup_user(Username) -> - rabbit_misc:dirty_read({user, Username}). + rabbit_misc:dirty_read({rabbit_user, Username}). add_vhost(VHostPath) -> R = rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:read({vhost, VHostPath}) of + case mnesia:wread({rabbit_vhost, VHostPath}) of [] -> - ok = mnesia:write(#vhost{virtual_host = VHostPath}), + ok = mnesia:write(rabbit_vhost, + #vhost{virtual_host = VHostPath}, + write), [rabbit_exchange:declare( rabbit_misc:r(VHostPath, exchange, Name), Type, true, false, []) || @@ -200,6 +239,8 @@ add_vhost(VHostPath) -> [{<<"">>, direct}, {<<"amq.direct">>, direct}, {<<"amq.topic">>, topic}, + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml {<<"amq.fanout">>, fanout}]], ok; [_] -> @@ -232,53 +273,79 @@ internal_delete_vhost(VHostPath) -> ok = rabbit_exchange:delete(Name, false) end, rabbit_exchange:list(VHostPath)), - lists:foreach(fun (Username) -> - ok = unmap_user_vhost(Username, VHostPath) + lists:foreach(fun ({Username, _, _, _}) -> + ok = clear_permissions(Username, VHostPath) end, - list_vhost_users(VHostPath)), - ok = mnesia:delete({vhost, VHostPath}), + list_vhost_permissions(VHostPath)), + ok = mnesia:delete({rabbit_vhost, VHostPath}), ok. list_vhosts() -> - mnesia:dirty_all_keys(vhost). + mnesia:dirty_all_keys(rabbit_vhost). -list_vhost_users(VHostPath) -> - [Username || - #user_vhost{username = Username} <- - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_vhost( - VHostPath, - fun () -> mnesia:index_read(user_vhost, VHostPath, - #user_vhost.virtual_host) - end))]. - -list_user_vhosts(Username) -> - [VHostPath || - #user_vhost{virtual_host = VHostPath} <- - %% TODO: use dirty ops instead - rabbit_misc:execute_mnesia_transaction( - rabbit_misc:with_user( - Username, - fun () -> mnesia:read({user_vhost, Username}) end))]. +validate_regexp(RegexpBin) -> + Regexp = binary_to_list(RegexpBin), + case regexp:parse(Regexp) of + {ok, _} -> ok; + {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}}) + end. -map_user_vhost(Username, VHostPath) -> +set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) -> + lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]), rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( Username, VHostPath, - fun () -> - ok = mnesia:write( - #user_vhost{username = Username, - virtual_host = VHostPath}) + fun () -> ok = mnesia:write( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = VHostPath}, + permission = #permission{ + configure = ConfigurePerm, + write = WritePerm, + read = ReadPerm}}, + write) end)). -unmap_user_vhost(Username, VHostPath) -> +clear_permissions(Username, VHostPath) -> rabbit_misc:execute_mnesia_transaction( rabbit_misc:with_user_and_vhost( Username, VHostPath, fun () -> - ok = mnesia:delete_object( - #user_vhost{username = Username, - virtual_host = VHostPath}) + ok = mnesia:delete({rabbit_user_permission, + #user_vhost{username = Username, + virtual_host = VHostPath}}) end)). +list_vhost_permissions(VHostPath) -> + [{Username, ConfigurePerm, WritePerm, ReadPerm} || + {Username, _, ConfigurePerm, WritePerm, ReadPerm} <- + list_permissions(rabbit_misc:with_vhost( + VHostPath, match_user_vhost('_', VHostPath)))]. + +list_user_permissions(Username) -> + [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <- + list_permissions(rabbit_misc:with_user( + Username, match_user_vhost(Username, '_')))]. + +list_permissions(QueryThunk) -> + [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} || + #user_permission{user_vhost = #user_vhost{username = Username, + virtual_host = VHostPath}, + permission = #permission{ + configure = ConfigurePerm, + write = WritePerm, + read = ReadPerm}} <- + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction(QueryThunk)]. + +match_user_vhost(Username, VHostPath) -> + fun () -> mnesia:match_object( + rabbit_user_permission, + #user_permission{user_vhost = #user_vhost{ + username = Username, + virtual_host = VHostPath}, + permission = '_'}, + read) + end. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9b79cc15..eb076e94 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -32,18 +32,18 @@ -module(rabbit_amqqueue). -export([start/0, recover/0, declare/4, delete/3, purge/1, internal_delete/1]). --export([conserve_memory/1, conserve_memory/2, pseudo_queue/2]). +-export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). --export([basic_get/3, basic_consume/7, basic_cancel/4]). --export([notify_sent/2]). --export([commit_all/2, rollback_all/2, notify_down_all/2]). +-export([basic_get/3, basic_consume/8, basic_cancel/4]). +-export([notify_sent/2, unblock/2]). +-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -import(mnesia). --import(gen_server). +-import(gen_server2). -import(lists). -import(queue). @@ -64,8 +64,6 @@ -spec(start/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). --spec(conserve_memory/1 :: (bool()) -> 'ok'). --spec(conserve_memory/2 :: (pid(), bool()) -> 'ok'). -spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) -> amqqueue()). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). @@ -90,18 +88,20 @@ -spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). --spec(commit/2 :: (pid(), txn()) -> 'ok' | {'error', any()}). --spec(rollback/2 :: (pid(), txn()) -> 'ok' | {'error', any()}). +-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). +-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). +-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). --spec(basic_consume/7 :: - (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) -> +-spec(basic_consume/8 :: + (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) -> 'ok' | {'error', 'queue_owned_by_another_connection' | 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). +-spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -124,21 +124,6 @@ recover() -> recover_durable_queues() -> Node = node(), -<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl - %% TODO: use dirty ops instead - R = rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(durable_queues), - node(Pid) == Node])) - end), - Queues = lists:map(fun start_queue_process/1, R), - rabbit_misc:execute_mnesia_transaction( - fun () -> - lists:foreach(fun store_queue/1, Queues), - ok - end). -======= lists:foreach( fun (RecoveredQ) -> Q = start_queue_process(RecoveredQ), @@ -165,20 +150,6 @@ recover_durable_queues() -> node(Pid) == Node])) end)), ok. ->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH - -conserve_memory(Conserve) -> - [ok = gen_server:cast(QPid, {conserve_memory, Conserve}) || - {_, QPid, worker, _} <- - supervisor:which_children(rabbit_amqqueue_sup)], - ok. - -conserve_memory(QPid, Conserve) -> - %% This needs to be synchronous. It is called during queue - %% creation and we need to make sure that the memory conservation - %% status of the queue has been set before it becomes reachable in - %% message routing. - gen_server:call(QPid, {conserve_memory, Conserve}). declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, @@ -188,7 +159,7 @@ declare(QueueName, Durable, AutoDelete, Args) -> pid = none}), case rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({amqqueue, QueueName}) of + case mnesia:wread({rabbit_queue, QueueName}) of [] -> ok = store_queue(Q), ok = add_default_binding(Q), Q; @@ -201,16 +172,15 @@ declare(QueueName, Durable, AutoDelete, Args) -> end. store_queue(Q = #amqqueue{durable = true}) -> - ok = mnesia:write(durable_queues, Q, write), - ok = mnesia:write(Q), + ok = mnesia:write(rabbit_durable_queue, Q, write), + ok = mnesia:write(rabbit_queue, Q, write), ok; store_queue(Q = #amqqueue{durable = false}) -> - ok = mnesia:write(Q), + ok = mnesia:write(rabbit_queue, Q, write), ok. start_queue_process(Q) -> {ok, Pid} = supervisor:start_child(rabbit_amqqueue_sup, [Q]), - ok = rabbit_alarm:maybe_conserve_memory(Pid), Q#amqqueue{pid = Pid}. add_default_binding(#amqqueue{name = QueueName}) -> @@ -220,7 +190,7 @@ add_default_binding(#amqqueue{name = QueueName}) -> ok. lookup(Name) -> - rabbit_misc:dirty_read({amqqueue, Name}). + rabbit_misc:dirty_read({rabbit_queue, Name}). with(Name, F, E) -> case lookup(Name) of @@ -237,23 +207,16 @@ with_or_die(Name, F) -> list(VHostPath) -> mnesia:dirty_match_object( + rabbit_queue, #amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> -<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl - gen_server:call(QPid, info). -======= gen_server2:pcall(QPid, 9, info, infinity). ->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH info(#amqqueue{ pid = QPid }, Items) -> -<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl - case gen_server:call(QPid, {info, Items}) of -======= case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of ->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -262,68 +225,44 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). -<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl -stat(#amqqueue{pid = QPid}) -> gen_server:call(QPid, stat). -======= stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity). ->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH stat_all() -> - lists:map(fun stat/1, rabbit_misc:dirty_read_all(amqqueue)). + lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> -<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl - gen_server:call(QPid, {delete, IfUnused, IfEmpty}). -======= gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity). ->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH -<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl -purge(#amqqueue{ pid = QPid }) -> gen_server:call(QPid, purge). -======= purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity). ->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH deliver(_IsMandatory, true, Txn, Message, QPid) -> -<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl - gen_server:call(QPid, {deliver_immediately, Txn, Message}); -======= gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity); ->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH deliver(true, _IsImmediate, Txn, Message, QPid) -> -<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl - gen_server:call(QPid, {deliver, Txn, Message}), -======= gen_server2:call(QPid, {deliver, Txn, Message}, infinity), ->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH true; deliver(false, _IsImmediate, Txn, Message, QPid) -> - gen_server:cast(QPid, {deliver, Txn, Message}), + gen_server2:cast(QPid, {deliver, Txn, Message}), true. redeliver(QPid, Messages) -> - gen_server:cast(QPid, {redeliver, Messages}). + gen_server2:cast(QPid, {redeliver, Messages}). requeue(QPid, MsgIds, ChPid) -> - gen_server:cast(QPid, {requeue, MsgIds, ChPid}). + gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}). + gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, -<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl - fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end, -======= fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end, ->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH QPids). rollback_all(QPids, Txn) -> safe_pmap_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, - fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end, + fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end, QPids). notify_down_all(QPids, ChPid) -> @@ -331,78 +270,50 @@ notify_down_all(QPids, ChPid) -> %% we don't care if the queue process has terminated in the %% meantime fun (_) -> ok end, -<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl - fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end, -======= fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end, ->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH QPids). +limit_all(QPids, ChPid, LimiterPid) -> + safe_pmap_ok( + fun (_) -> ok end, + fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end, + QPids). + claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> -<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl - gen_server:call(QPid, {claim_queue, ReaderPid}). -======= gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity). ->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> -<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl - gen_server:call(QPid, {basic_get, ChPid, NoAck}). -======= gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity). ->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH -basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, +basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> -<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl - gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, - ConsumerTag, ExclusiveConsume, OkMsg}). -======= gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, infinity). ->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> -<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl - ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). -======= ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, infinity). ->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH notify_sent(QPid, ChPid) -> - gen_server:cast(QPid, {notify_sent, ChPid}). + gen_server2:cast(QPid, {notify_sent, ChPid}). + +unblock(QPid, ChPid) -> + gen_server2:cast(QPid, {unblock, ChPid}). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> -<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl - case mnesia:wread({amqqueue, QueueName}) of - [] -> {error, not_found}; - [Q] -> - ok = delete_queue(Q), - ok = mnesia:delete({durable_queues, QueueName}), -======= case mnesia:wread({rabbit_queue, QueueName}) of [] -> {error, not_found}; [_] -> ok = rabbit_exchange:delete_queue_bindings(QueueName), ok = mnesia:delete({rabbit_queue, QueueName}), ok = mnesia:delete({rabbit_durable_queue, QueueName}), ->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH ok end end). -<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl -delete_queue(#amqqueue{name = QueueName}) -> - ok = rabbit_exchange:delete_bindings_for_queue(QueueName), - ok = mnesia:delete({amqqueue, QueueName}), - ok. - -======= ->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH on_node_down(Node) -> rabbit_misc:execute_mnesia_transaction( fun () -> @@ -414,15 +325,9 @@ on_node_down(Node) -> Acc end, ok, -<<<<<<< /tmp/rabbitmq-server/src/rabbit_amqqueue.erl - qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(amqqueue), - node(Pid) == Node])) -======= qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid} <- mnesia:table(rabbit_queue), node(Pid) == Node])) ->>>>>>> /tmp/rabbit_amqqueue.erl~other.MhI2TH end). pseudo_queue(QueueName, Pid) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7919be52..c390b2b7 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --behaviour(gen_server). +-behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). -define(HIBERNATE_AFTER, 1000). @@ -51,23 +51,21 @@ owner, exclusive_consumer, has_had_consumers, - conserve_memory, - dropped_message_count, next_msg_id, message_buffer, round_robin}). -record(consumer, {tag, ack_required}). --record(tx, {ch_pid, is_persistent, fail_reason, - pending_messages, pending_acks}). +-record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}). %% These are held in our process dictionary -record(cr, {consumers, ch_pid, + limiter_pid, monitor_ref, unacked_messages, - is_overload_protection_active, + is_limit_active, unsent_message_count}). -define(INFO_KEYS, @@ -88,7 +86,7 @@ %%---------------------------------------------------------------------------- start_link(Q) -> - gen_server:start_link(?MODULE, Q, []). + gen_server2:start_link(?MODULE, Q, []). %%---------------------------------------------------------------------------- @@ -98,16 +96,13 @@ init(Q) -> owner = none, exclusive_consumer = none, has_had_consumers = false, - conserve_memory = false, - dropped_message_count = 0, next_msg_id = 1, message_buffer = queue:new(), round_robin = queue:new()}, ?HIBERNATE_AFTER}. -terminate(_Reason, State = #q{dropped_message_count = C}) -> +terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? QName = qname(State), - log_dropped_message_count(QName, C), lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end, all_tx()), ok = purge_message_buffer(QName, State#q.message_buffer), @@ -118,20 +113,9 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -log_dropped_message_count(_QName, 0) -> - ok; -log_dropped_message_count(QName, C) -> - rabbit_log:warning("~s dropped ~p messages to conserve memory~n", - [rabbit_misc:rs(QName), C]), - ok. - -conserve_memory(false, State = #q{q = #amqqueue{name = QName}, - conserve_memory = true, - dropped_message_count = C}) -> - log_dropped_message_count(QName, C), - State#q{conserve_memory = false, dropped_message_count = 0}; -conserve_memory(Conserve, State) -> - State#q{conserve_memory = Conserve}. +reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}. + +noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -148,7 +132,7 @@ ch_record(ChPid) -> ch_pid = ChPid, monitor_ref = MonitorRef, unacked_messages = dict:new(), - is_overload_protection_active = false, + is_limit_active = false, unsent_message_count = 0}, put(Key, C), C; @@ -161,45 +145,18 @@ store_ch_record(C = #cr{ch_pid = ChPid}) -> all_ch_record() -> [C || {{ch, _}, C} <- get()]. -<<<<<<< local is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> Limited orelse Count > ?UNSENT_MESSAGE_LIMIT. -======= -update_store_and_maybe_block_ch( - C = #cr{is_overload_protection_active = Active, - unsent_message_count = Count}) -> - {Result, NewActive} = - if - not(Active) and (Count > ?UNSENT_MESSAGE_LIMIT) -> - {block_ch, true}; - Active and (Count == 0) -> - {unblock_ch, false}; - true -> - {ok, Active} - end, - store_ch_record(C#cr{is_overload_protection_active = NewActive}), - Result. ->>>>>>> other - -<<<<<<< local -increment_dropped_message_count(State) -> - State#q{dropped_message_count = State#q.dropped_message_count + 1}. - -find_auto_ack_consumer(RoundRobin, RoundRobinNew) -> - case queue:out(RoundRobin) of - {{value, QEntry = {_, #consumer{ack_required = AckRequired}}}, - RoundRobinTail} -> - case AckRequired of - true -> find_auto_ack_consumer( - RoundRobinTail, - queue:in(QEntry, RoundRobinNew)); - false -> {QEntry, queue:join(RoundRobinNew, RoundRobinTail)} - end; - {empty, _} -> false + +ch_record_state_transition(OldCR, NewCR) -> + BlockedOld = is_ch_blocked(OldCR), + BlockedNew = is_ch_blocked(NewCR), + if BlockedOld andalso not(BlockedNew) -> unblock; + BlockedNew andalso not(BlockedOld) -> block; + true -> ok end. -======= + deliver_immediately(Message, Delivered, ->>>>>>> other State = #q{q = #amqqueue{name = QName}, round_robin = RoundRobin, next_msg_id = NextId}) -> @@ -208,117 +165,60 @@ deliver_immediately(Message, Delivered, {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, RoundRobinTail} -> - rabbit_channel:deliver( - ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, Delivered, Message}), - C = #cr{unsent_message_count = Count, + C = #cr{limiter_pid = LimiterPid, + unsent_message_count = Count, unacked_messages = UAM} = ch_record(ChPid), -<<<<<<< local case not(AckRequired) orelse rabbit_limiter:can_send( - -deliver_to_consumer(Message, Delivered, QName, MsgId, - QEntry = {ChPid, #consumer{tag = ConsumerTag, - ack_required = AckRequired}}, - RoundRobinTail) -> - ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), - rabbit_channel:deliver( - ChPid, ConsumerTag, AckRequired, - {QName, self(), MsgId, Delivered, Message}), - C = #cr{unsent_message_count = Count, unacked_messages = UAM} = - ch_record(ChPid), - NewUAM = case AckRequired of - true -> dict:store(MsgId, Message, UAM); - false -> UAM - end, - NewConsumers = case update_store_and_maybe_block_ch( - C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}) of - ok -> queue:in(QEntry, RoundRobinTail); - block_ch -> block_consumers(ChPid, RoundRobinTail) - end, - {AckRequired, NewConsumers}. - -deliver_immediately(Message, Delivered, true, - State = #q{q = #amqqueue{name = QName}, - round_robin = RoundRobin, - next_msg_id = NextId}) -> - case queue:is_empty(RoundRobin) of - true -> {not_offered, State}; - false -> case find_auto_ack_consumer(RoundRobin, queue:new()) of - false -> - {not_offered, - increment_dropped_message_count(State)}; - {QEntry, RoundRobinTail} -> - {AckRequired, NewRoundRobin} = - deliver_to_consumer( - Message, Delivered, QName, NextId, - QEntry, RoundRobinTail), - {offered, AckRequired, - State#q{round_robin = NewRoundRobin, - next_msg_id = NextId + 1}} - end - end; -deliver_immediately(Message, Delivered, false, -<<<<<<< local - {{value, QEntry}, RoundRobinTail} -> - {AckRequired, NewRoundRobin} = - deliver_to_consumer(Message, Delivered, QName, NextId, - QEntry, RoundRobinTail), - {offered, AckRequired, State#q{round_robin = NewRoundRobin, - next_msg_id = NextId + 1}}; -======= ->>>>>>> other + LimiterPid, self()) of + true -> + rabbit_channel:deliver( + ChPid, ConsumerTag, AckRequired, + {QName, self(), NextId, Delivered, Message}), + NewUAM = case AckRequired of + true -> dict:store(NextId, Message, UAM); + false -> UAM + end, + NewC = C#cr{unsent_message_count = Count + 1, + unacked_messages = NewUAM}, + store_ch_record(NewC), + NewConsumers = + case ch_record_state_transition(C, NewC) of + ok -> queue:in(QEntry, RoundRobinTail); + block -> block_consumers(ChPid, RoundRobinTail) + end, + {offered, AckRequired, State#q{round_robin = NewConsumers, + next_msg_id = NextId + 1}}; false -> store_ch_record(C#cr{is_limit_active = true}), NewConsumers = block_consumers(ChPid, RoundRobinTail), deliver_immediately(Message, Delivered, State#q{round_robin = NewConsumers}) end; -======= - NewUAM = case AckRequired of - true -> dict:store(NextId, Message, UAM); - false -> UAM - end, - NewConsumers = - case update_store_and_maybe_block_ch( - C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}) of - ok -> queue:in(QEntry, RoundRobinTail); - block_ch -> block_consumers(ChPid, RoundRobinTail) - end, - {offered, AckRequired, State#q{round_robin = NewConsumers, - next_msg_id = NextId +1}}; ->>>>>>> other {empty, _} -> - not_offered + {not_offered, State} end. -attempt_delivery(none, Message, State = #q{conserve_memory = Conserve}) -> - case deliver_immediately(Message, false, Conserve, State) of +attempt_delivery(none, Message, State) -> + case deliver_immediately(Message, false, State) of {offered, false, State1} -> {true, State1}; {offered, true, State1} -> persist_message(none, qname(State), Message), persist_delivery(qname(State), Message, false), {true, State1}; - not_offered -> - {false, State} + {not_offered, State1} -> + {false, State1} end; -attempt_delivery(Txn, Message, State = #q{conserve_memory = false}) -> +attempt_delivery(Txn, Message, State) -> persist_message(Txn, qname(State), Message), record_pending_message(Txn, Message), - {true, State}; -attempt_delivery(Txn, _Message, State = #q{conserve_memory = true}) -> - mark_tx_failed(Txn, dropped_messages_to_conserve_memory), - {false, increment_dropped_message_count(State)}. + {true, State}. deliver_or_enqueue(Txn, Message, State) -> case attempt_delivery(Txn, Message, State) of {true, NewState} -> {true, NewState}; - {false, NewState = #q{conserve_memory = true}} -> - {false, NewState}; - {false, NewState = #q{conserve_memory = false}} -> + {false, NewState} -> persist_message(Txn, qname(State), Message), NewMB = queue:in({Message, false}, NewState#q.message_buffer), {false, NewState#q{message_buffer = NewMB}} @@ -345,16 +245,22 @@ block_consumer(ChPid, ConsumerTag, RoundRobin) -> (CP /= ChPid) or (CT /= ConsumerTag) end, queue:to_list(RoundRobin))). -possibly_unblock(C = #cr{consumers = Consumers, ch_pid = ChPid}, - State = #q{round_robin = RoundRobin}) -> - case update_store_and_maybe_block_ch(C) of - ok -> +possibly_unblock(State, ChPid, Update) -> + case lookup_ch(ChPid) of + not_found -> State; - unblock_ch -> - run_poke_burst(State#q{round_robin = - unblock_consumers(ChPid, Consumers, RoundRobin)}) + C -> + NewC = Update(C), + store_ch_record(NewC), + case ch_record_state_transition(C, NewC) of + ok -> State; + unblock -> NewRR = unblock_consumers(ChPid, + NewC#cr.consumers, + State#q.round_robin), + run_poke_burst(State#q{round_robin = NewRR}) + end end. - + check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) -> {continue, State}; check_auto_delete(State = #q{has_had_consumers = false}) -> @@ -409,7 +315,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, {stop, normal, NewState} end end. - + cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) -> none; cancel_holder(_ChPid, _ConsumerTag, Holder) -> @@ -435,15 +341,15 @@ run_poke_burst(State = #q{message_buffer = MessageBuffer}) -> run_poke_burst(MessageBuffer, State) -> case queue:out(MessageBuffer) of {{value, {Message, Delivered}}, BufferTail} -> - case deliver_immediately(Message, Delivered, false, State) of + case deliver_immediately(Message, Delivered, State) of {offered, true, NewState} -> persist_delivery(qname(State), Message, Delivered), run_poke_burst(BufferTail, NewState); {offered, false, NewState} -> persist_auto_ack(qname(State), Message), run_poke_burst(BufferTail, NewState); - not_offered -> - State#q{message_buffer = MessageBuffer} + {not_offered, NewState} -> + NewState#q{message_buffer = MessageBuffer} end; {empty, _} -> State#q{message_buffer = MessageBuffer} @@ -525,7 +431,6 @@ lookup_tx(Txn) -> case get({txn, Txn}) of undefined -> #tx{ch_pid = none, is_persistent = false, - fail_reason = none, pending_messages = [], pending_acks = []}; V -> V @@ -551,14 +456,6 @@ is_tx_persistent(Txn) -> #tx{is_persistent = Res} = lookup_tx(Txn), Res. -mark_tx_failed(Txn, Reason) -> - Tx = lookup_tx(Txn), - store_tx(Txn, Tx#tx{fail_reason = Reason}). - -tx_fail_reason(Txn) -> - #tx{fail_reason = Res} = lookup_tx(Txn), - Res. - record_pending_message(Txn, Message) -> Tx = #tx{pending_messages = Pending} = lookup_tx(Txn), store_tx(Txn, Tx#tx{pending_messages = [{Message, false} | Pending]}). @@ -617,8 +514,8 @@ i(messages_uncommitted, _) -> #tx{pending_messages = Pending} <- all_tx_record()]); i(messages, State) -> lists:sum([i(Item, State) || Item <- [messages_ready, - messages_unacknowledged, - messages_uncommitted]]); + messages_unacknowledged, + messages_uncommitted]]); i(acks_uncommitted, _) -> lists:sum([length(Pending) || #tx{pending_acks = Pending} <- all_tx_record()]); @@ -635,8 +532,8 @@ i(Item, _) -> %--------------------------------------------------------------------------- -handle_call({conserve_memory, Conserve}, _From, State) -> - {reply, ok, conserve_memory(Conserve, State)}; +handle_call(info, _From, State) -> + reply(infos(?INFO_KEYS, State), State); handle_call({info, Items}, _From, State) -> try @@ -667,30 +564,16 @@ handle_call({deliver, Txn, Message}, _From, State) -> reply(Delivered, NewState); handle_call({commit, Txn}, From, State) -> -<<<<<<< local - NewState = - case tx_fail_reason(Txn) of - none -> ok = commit_work(Txn, qname(State)), - %% optimisation: we reply straight away so the - %% sender can continue - gen_server:reply(From, ok), - process_pending(Txn, State); - Reason -> ok = rollback_work(Txn, qname(State)), - gen_server:reply(From, {error, Reason}), - State - end, -======= ok = commit_work(Txn, qname(State)), %% optimisation: we reply straight away so the sender can continue - gen_server:reply(From, ok), + gen_server2:reply(From, ok), NewState = process_pending(Txn, State), ->>>>>>> other erase_tx(Txn), noreply(NewState); handle_call({notify_down, ChPid}, From, State) -> %% optimisation: we reply straight away so the sender can continue - gen_server:reply(From, ok), + gen_server2:reply(From, ok), handle_ch_down(ChPid, State); handle_call({basic_get, ChPid, NoAck}, _From, @@ -717,8 +600,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply(empty, State) end; -handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, - ExclusiveConsume, OkMsg}, +handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, + ConsumerTag, ExclusiveConsume, OkMsg}, _From, State = #q{owner = Owner, exclusive_consumer = ExistingHolder, round_robin = RoundRobin}) -> @@ -732,8 +615,13 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, ok -> C = #cr{consumers = Consumers} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, - C1 = C#cr{consumers = [Consumer | Consumers]}, - store_ch_record(C1), + store_ch_record(C#cr{consumers = [Consumer | Consumers], + limiter_pid = LimiterPid}), + if Consumers == [] -> + ok = rabbit_limiter:register(LimiterPid, self()); + true -> + ok + end, State1 = State#q{has_had_consumers = true, exclusive_consumer = if @@ -753,12 +641,16 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, not_found -> ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C = #cr{consumers = Consumers} -> + C = #cr{consumers = Consumers, limiter_pid = LimiterPid} -> NewConsumers = lists:filter (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end, Consumers), - C1 = C#cr{consumers = NewConsumers}, - store_ch_record(C1), + store_ch_record(C#cr{consumers = NewConsumers}), + if NewConsumers == [] -> + ok = rabbit_limiter:unregister(LimiterPid, self()); + true -> + ok + end, ok = maybe_send_reply(ChPid, OkMsg), case check_auto_delete( State#q{exclusive_consumer = cancel_holder(ChPid, @@ -819,9 +711,6 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, reply(locked, State) end. -handle_cast({conserve_memory, Conserve}, State) -> - {noreply, conserve_memory(Conserve, State)}; - handle_cast({deliver, Txn, Message}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. {_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), @@ -864,14 +753,33 @@ handle_cast({requeue, MsgIds, ChPid}, State) -> [{Message, true} || Message <- Messages], State)) end; +handle_cast({unblock, ChPid}, State) -> + noreply( + possibly_unblock(State, ChPid, + fun (C) -> C#cr{is_limit_active = false} end)); + handle_cast({notify_sent, ChPid}, State) -> - case lookup_ch(ChPid) of - not_found -> noreply(State); - T = #cr{unsent_message_count =Count} -> - noreply(possibly_unblock( - T#cr{unsent_message_count = Count - 1}, - State)) - end. + noreply( + possibly_unblock(State, ChPid, + fun (C = #cr{unsent_message_count = Count}) -> + C#cr{unsent_message_count = Count - 1} + end)); + +handle_cast({limit, ChPid, LimiterPid}, State) -> + noreply( + possibly_unblock( + State, ChPid, + fun (C = #cr{consumers = Consumers, + limiter_pid = OldLimiterPid, + is_limit_active = Limited}) -> + if Consumers =/= [] andalso OldLimiterPid == undefined -> + ok = rabbit_limiter:register(LimiterPid, self()); + true -> + ok + end, + NewLimited = Limited andalso LimiterPid =/= undefined, + C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} + end)). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> @@ -892,7 +800,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State) -> %% TODO: Once we drop support for R11B-5, we can change this to %% {noreply, State, hibernate}; - proc_lib:hibernate(gen_server, enter_loop, [?MODULE, [], State]); + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]); handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl new file mode 100644 index 00000000..761b3863 --- /dev/null +++ b/src/rabbit_basic.erl @@ -0,0 +1,75 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_basic). +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +-export([publish/1, message/4, delivery/4]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(publish/1 :: (delivery()) -> + {ok, routing_result(), [pid()]} | not_found()). +-spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()). +-spec(message/4 :: (exchange_name(), routing_key(), binary(), binary()) -> + message()). + +-endif. + +%%---------------------------------------------------------------------------- + +publish(Delivery = #delivery{ + message = #basic_message{exchange_name = ExchangeName}}) -> + case rabbit_exchange:lookup(ExchangeName) of + {ok, X} -> + {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery), + {ok, RoutingRes, DeliveredQPids}; + Other -> + Other + end. + +delivery(Mandatory, Immediate, Txn, Message) -> + #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, + sender = self(), message = Message}. + +message(ExchangeName, RoutingKeyBin, ContentTypeBin, BodyBin) -> + {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), + Content = #content{class_id = ClassId, + properties = #'P_basic'{content_type = ContentTypeBin}, + properties_bin = none, + payload_fragments_rev = [BodyBin]}, + #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKeyBin, + content = Content, + persistent_key = none}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5fd9a512..b2716ec4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -33,23 +33,29 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/4, do/2, do/3, shutdown/1]). +-behaviour(gen_server2). + +-export([start_link/5, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2]). -%% callbacks --export([init/2, handle_message/2]). +-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --record(ch, {state, proxy_pid, reader_pid, writer_pid, +-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping}). +-define(HIBERNATE_AFTER, 1000). + +-define(MAX_PERMISSION_CACHE_SIZE, 12). + %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/4 :: (pid(), pid(), username(), vhost()) -> pid()). +-spec(start_link/5 :: + (channel_number(), pid(), pid(), username(), vhost()) -> pid()). -spec(do/2 :: (pid(), amqp_method()) -> 'ok'). -spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). @@ -61,112 +67,126 @@ %%---------------------------------------------------------------------------- -start_link(ReaderPid, WriterPid, Username, VHost) -> - buffering_proxy:start_link(?MODULE, [ReaderPid, WriterPid, - Username, VHost]). +start_link(Channel, ReaderPid, WriterPid, Username, VHost) -> + {ok, Pid} = gen_server2:start_link( + ?MODULE, [Channel, ReaderPid, WriterPid, + Username, VHost], []), + Pid. do(Pid, Method) -> do(Pid, Method, none). do(Pid, Method, Content) -> - Pid ! {method, Method, Content}, - ok. + gen_server2:cast(Pid, {method, Method, Content}). shutdown(Pid) -> - Pid ! terminate, - ok. + gen_server2:cast(Pid, terminate). send_command(Pid, Msg) -> - Pid ! {command, Msg}, - ok. + gen_server2:cast(Pid, {command, Msg}). deliver(Pid, ConsumerTag, AckRequired, Msg) -> - Pid ! {deliver, ConsumerTag, AckRequired, Msg}, - ok. + gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). conserve_memory(Pid, Conserve) -> - Pid ! {conserve_memory, Conserve}, - ok. + gen_server2:cast(Pid, {conserve_memory, Conserve}). %%--------------------------------------------------------------------------- -init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> +init([Channel, ReaderPid, WriterPid, Username, VHost]) -> process_flag(trap_exit, true), link(WriterPid), - %% this is bypassing the proxy so alarms can "jump the queue" and - %% be handled promptly rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), - #ch{state = starting, - proxy_pid = ProxyPid, - reader_pid = ReaderPid, - writer_pid = WriterPid, - transaction_id = none, - tx_participants = sets:new(), - next_tag = 1, - uncommitted_ack_q = queue:new(), - unacked_message_q = queue:new(), - username = Username, - virtual_host = VHost, - most_recently_declared_queue = <<>>, - consumer_mapping = dict:new()}. - -handle_message({method, Method, Content}, State) -> + {ok, #ch{state = starting, + channel = Channel, + reader_pid = ReaderPid, + writer_pid = WriterPid, + limiter_pid = undefined, + transaction_id = none, + tx_participants = sets:new(), + next_tag = 1, + uncommitted_ack_q = queue:new(), + unacked_message_q = queue:new(), + username = Username, + virtual_host = VHost, + most_recently_declared_queue = <<>>, + consumer_mapping = dict:new()}}. + +handle_call(_Request, _From, State) -> + noreply(State). + +handle_cast({method, Method, Content}, State) -> try handle_method(Method, Content, State) of {reply, Reply, NewState} -> ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), - NewState; + noreply(NewState); {noreply, NewState} -> - NewState; + noreply(NewState); stop -> - exit(normal) + {stop, normal, State#ch{state = terminating}} catch exit:{amqp, Error, Explanation, none} -> - terminate({amqp, Error, Explanation, - rabbit_misc:method_record_type(Method)}, - State); + ok = notify_queues(internal_rollback(State)), + Reason = {amqp, Error, Explanation, + rabbit_misc:method_record_type(Method)}, + State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, + {stop, normal, State#ch{state = terminating}}; exit:normal -> - terminate(normal, State); + {stop, normal, State}; _:Reason -> - terminate({Reason, erlang:get_stacktrace()}, State) + {stop, {Reason, erlang:get_stacktrace()}, State} end; -handle_message(terminate, State) -> - terminate(normal, State); +handle_cast(terminate, State) -> + {stop, normal, State}; -handle_message({command, Msg}, State = #ch{writer_pid = WriterPid}) -> +handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), - State; + noreply(State); -handle_message({deliver, ConsumerTag, AckRequired, Msg}, - State = #ch{proxy_pid = ProxyPid, - writer_pid = WriterPid, - next_tag = DeliveryTag}) -> +handle_cast({deliver, ConsumerTag, AckRequired, Msg}, + State = #ch{writer_pid = WriterPid, + next_tag = DeliveryTag}) -> State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), - ok = internal_deliver(WriterPid, ProxyPid, - true, ConsumerTag, DeliveryTag, Msg), - State1#ch{next_tag = DeliveryTag + 1}; + ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), + noreply(State1#ch{next_tag = DeliveryTag + 1}); -handle_message({conserve_memory, Conserve}, State) -> +handle_cast({conserve_memory, Conserve}, State) -> + ok = clear_permission_cache(), ok = rabbit_writer:send_command( State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), - State; + noreply(State). -handle_message({'EXIT', _Pid, Reason}, State) -> - terminate(Reason, State); +handle_info({'EXIT', _Pid, Reason}, State) -> + {stop, Reason, State}; -handle_message(Other, State) -> - terminate({unexpected_channel_message, Other}, State). +handle_info(timeout, State) -> + ok = clear_permission_cache(), + %% TODO: Once we drop support for R11B-5, we can change this to + %% {noreply, State, hibernate}; + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]). -%%--------------------------------------------------------------------------- +terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, + state = terminating}) -> + rabbit_writer:shutdown(WriterPid), + rabbit_limiter:shutdown(LimiterPid); -terminate(Reason, State = #ch{writer_pid = WriterPid}) -> +terminate(Reason, State = #ch{writer_pid = WriterPid, + limiter_pid = LimiterPid}) -> Res = notify_queues(internal_rollback(State)), case Reason of normal -> ok = Res; _ -> ok end, rabbit_writer:shutdown(WriterPid), - exit(Reason). + rabbit_limiter:shutdown(LimiterPid). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%--------------------------------------------------------------------------- + +noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -190,6 +210,35 @@ return_queue_declare_ok(State, NoWait, Q) -> {reply, Reply, NewState} end. +check_resource_access(Username, Resource, Perm) -> + V = {Resource, Perm}, + Cache = case get(permission_cache) of + undefined -> []; + Other -> Other + end, + CacheTail = + case lists:member(V, Cache) of + true -> lists:delete(V, Cache); + false -> ok = rabbit_access_control:check_resource_access( + Username, Resource, Perm), + lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1) + end, + put(permission_cache, [V | CacheTail]), + ok. + +clear_permission_cache() -> + erase(permission_cache), + ok. + +check_configure_permitted(Resource, #ch{ username = Username}) -> + check_resource_access(Username, Resource, #permission.configure). + +check_write_permitted(Resource, #ch{ username = Username}) -> + check_resource_access(Username, Resource, #permission.write). + +check_read_permitted(Resource, #ch{ username = Username}) -> + check_resource_access(Username, Resource, #permission.read). + expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( not_allowed, "no previously declared queue", []); @@ -248,7 +297,6 @@ handle_method(_Method, _, #ch{state = starting}) -> handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> ok = notify_queues(internal_rollback(State)), ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), - ok = rabbit_writer:shutdown(WriterPid), stop; handle_method(#'access.request'{},_, State) -> @@ -260,6 +308,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, immediate = Immediate}, Content, State = #ch{ virtual_host = VHostPath}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. @@ -273,7 +322,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, content = DecodedContent, persistent_key = PersistentKey}, - rabbit_exchange:route(Exchange, RoutingKey), State)}; + rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, @@ -286,9 +335,10 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, true -> ok end, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), - Participants = ack(State#ch.proxy_pid, TxnKey, Acked), + Participants = ack(TxnKey, Acked), {noreply, case TxnKey of - none -> State#ch{unacked_message_q = Remaining}; + none -> ok = notify_limiter(State#ch.limiter_pid, Acked), + State#ch{unacked_message_q = Remaining}; _ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q, Acked), add_tx_participants( @@ -299,12 +349,13 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{ proxy_pid = ProxyPid, writer_pid = WriterPid, + _, State = #ch{ writer_pid = WriterPid, next_tag = DeliveryTag }) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), + check_read_permitted(QueueName, State), case rabbit_amqqueue:with_or_die( QueueName, - fun (Q) -> rabbit_amqqueue:basic_get(Q, ProxyPid, NoAck) end) of + fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, Msg = {_QName, _QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, @@ -330,12 +381,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_ack = NoAck, exclusive = ExclusiveConsume, nowait = NoWait}, - _, State = #ch{ proxy_pid = ProxyPid, - reader_pid = ReaderPid, + _, State = #ch{ reader_pid = ReaderPid, + limiter_pid = LimiterPid, consumer_mapping = ConsumerMapping }) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), + check_read_permitted(QueueName, State), ActualConsumerTag = case ConsumerTag of <<>> -> rabbit_guid:binstring_guid("amq.ctag"); @@ -349,7 +401,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, QueueName, fun (Q) -> rabbit_amqqueue:basic_consume( - Q, NoAck, ReaderPid, ProxyPid, + Q, NoAck, ReaderPid, self(), LimiterPid, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})) @@ -380,8 +432,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, - _, State = #ch{ proxy_pid = ProxyPid, - consumer_mapping = ConsumerMapping }) -> + _, State = #ch{consumer_mapping = ConsumerMapping }) -> OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag}, case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -402,7 +453,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, %% cancel_ok ourselves it might overtake a %% message sent previously by the queue. rabbit_amqqueue:basic_cancel( - Q, ProxyPid, ConsumerTag, + Q, self(), ConsumerTag, ok_msg(NoWait, #'basic.cancel_ok'{ consumer_tag = ConsumerTag})) end) of @@ -414,13 +465,34 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, end end; -handle_method(#'basic.qos'{}, _, State) -> - %% FIXME: Need to implement QOS - {reply, #'basic.qos_ok'{}, State}; +handle_method(#'basic.qos'{global = true}, _, _State) -> + rabbit_misc:protocol_error(not_implemented, "global=true", []); + +handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> + rabbit_misc:protocol_error(not_implemented, + "prefetch_size!=0 (~w)", [Size]); + +handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, + _, State = #ch{ limiter_pid = LimiterPid }) -> + NewLimiterPid = case {LimiterPid, PrefetchCount} of + {undefined, 0} -> + undefined; + {undefined, _} -> + LPid = rabbit_limiter:start_link(self()), + ok = limit_queues(LPid, State), + LPid; + {_, 0} -> + ok = rabbit_limiter:shutdown(LimiterPid), + ok = limit_queues(undefined, State), + undefined; + {_, _} -> + LimiterPid + end, + ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount), + {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}}; handle_method(#'basic.recover'{requeue = true}, _, State = #ch{ transaction_id = none, - proxy_pid = ProxyPid, unacked_message_q = UAMQ }) -> ok = fold_per_queue( fun (QPid, MsgIds, ok) -> @@ -429,14 +501,13 @@ handle_method(#'basic.recover'{requeue = true}, %% order. To keep it happy we reverse the id list %% since we are given them in reverse order. rabbit_amqqueue:requeue( - QPid, lists:reverse(MsgIds), ProxyPid) + QPid, lists:reverse(MsgIds), self()) end, ok, UAMQ), %% No answer required, apparently! {noreply, State#ch{unacked_message_q = queue:new()}}; handle_method(#'basic.recover'{requeue = false}, _, State = #ch{ transaction_id = none, - proxy_pid = ProxyPid, writer_pid = WriterPid, unacked_message_q = UAMQ }) -> lists:foreach( @@ -454,8 +525,7 @@ handle_method(#'basic.recover'{requeue = false}, %% %% FIXME: should we allocate a fresh DeliveryTag? ok = internal_deliver( - WriterPid, ProxyPid, - false, ConsumerTag, DeliveryTag, + WriterPid, false, ConsumerTag, DeliveryTag, {QName, QPid, MsgId, true, Message}) end, queue:to_list(UAMQ)), %% No answer required, apparently! @@ -476,6 +546,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, _, State = #ch{ virtual_host = VHostPath }) -> CheckedType = rabbit_exchange:check_type(TypeNameBin), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_configure_permitted(ExchangeName, State), X = case rabbit_exchange:lookup(ExchangeName) of {ok, FoundX} -> FoundX; {error, not_found} -> @@ -495,6 +566,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, nowait = NoWait}, _, State = #ch{ virtual_host = VHostPath }) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_configure_permitted(ExchangeName, State), X = rabbit_exchange:lookup_or_die(ExchangeName), ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)), return_ok(State, NoWait, #'exchange.declare_ok'{}); @@ -504,6 +576,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, nowait = NoWait}, _, State = #ch { virtual_host = VHostPath }) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_configure_permitted(ExchangeName, State), case rabbit_exchange:delete(ExchangeName, IfUnused) of {error, not_found} -> rabbit_misc:protocol_error( @@ -554,9 +627,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin, Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), + check_configure_permitted(QueueName, State), Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args)); - Other -> Other + Other = #amqqueue{name = QueueName} -> + check_configure_permitted(QueueName, State), + Other end, return_queue_declare_ok(State, NoWait, Q); @@ -565,6 +641,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{ virtual_host = VHostPath }) -> QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin), + check_configure_permitted(QueueName, State), Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end), return_queue_declare_ok(State, NoWait, Q); @@ -575,6 +652,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin, }, _, State) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), + check_configure_permitted(QueueName, State), case rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of @@ -611,6 +689,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin, nowait = NoWait}, _, State) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), + check_read_permitted(QueueName, State), {ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die( QueueName, fun (Q) -> rabbit_amqqueue:purge(Q) end), @@ -660,9 +739,11 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, %% FIXME: don't allow binding to internal exchanges - %% including the one named "" ! QueueName = expand_queue_name_shortcut(QueueNameBin, State), + check_write_permitted(QueueName, State), ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), + check_read_permitted(ExchangeName, State), case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of {error, exchange_not_found} -> rabbit_misc:protocol_error( @@ -748,10 +829,10 @@ add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) -> State#ch{tx_participants = sets:union(Participants, sets:from_list(MoreP))}. -ack(ProxyPid, TxnKey, UAQ) -> +ack(TxnKey, UAQ) -> fold_per_queue( fun (QPid, MsgIds, L) -> - ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, ProxyPid), + ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()), [QPid | L] end, [], UAQ). @@ -766,7 +847,9 @@ internal_commit(State = #ch{transaction_id = TxnKey, tx_participants = Participants}) -> case rabbit_amqqueue:commit_all(sets:to_list(Participants), TxnKey) of - ok -> new_tx(State); + ok -> ok = notify_limiter(State#ch.limiter_pid, + State#ch.uncommitted_ack_q), + new_tx(State); {error, Errors} -> rabbit_misc:protocol_error( internal_error, "commit failed: ~w", [Errors]) end. @@ -803,19 +886,37 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). -notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) -> - rabbit_amqqueue:notify_down_all( - [QPid || QueueName <- - sets:to_list( - dict:fold(fun (_ConsumerTag, QueueName, S) -> - sets:add_element(QueueName, S) - end, sets:new(), Consumers)), - case rabbit_amqqueue:lookup(QueueName) of - {ok, Q} -> QPid = Q#amqqueue.pid, true; - %% queue has been deleted in the meantime - {error, not_found} -> QPid = none, false - end], - ProxyPid). +notify_queues(#ch{consumer_mapping = Consumers}) -> + rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()). + +limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> + rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid). + +consumer_queues(Consumers) -> + [QPid || QueueName <- + sets:to_list( + dict:fold(fun (_ConsumerTag, QueueName, S) -> + sets:add_element(QueueName, S) + end, sets:new(), Consumers)), + case rabbit_amqqueue:lookup(QueueName) of + {ok, Q} -> QPid = Q#amqqueue.pid, true; + %% queue has been deleted in the meantime + {error, not_found} -> QPid = none, false + end]. + +%% tell the limiter about the number of acks that have been received +%% for messages delivered to subscribed consumers, but not acks for +%% messages sent in a response to a basic.get (identified by their +%% 'none' consumer tag) +notify_limiter(undefined, _Acked) -> + ok; +notify_limiter(LimiterPid, Acked) -> + case lists:foldl(fun ({_, none, _}, Acc) -> Acc; + ({_, _, _}, Acc) -> Acc + 1 + end, 0, queue:to_list(Acked)) of + 0 -> ok; + Count -> rabbit_limiter:ack(LimiterPid, Count) + end. is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> @@ -823,7 +924,8 @@ is_message_persistent(#content{properties = #'P_basic'{ 1 -> false; 2 -> true; undefined -> false; - Other -> rabbit_log:warning("Unknown delivery mode ~p - treating as 1, non-persistent~n", + Other -> rabbit_log:warning("Unknown delivery mode ~p - " + "treating as 1, non-persistent~n", [Other]), false end. @@ -833,7 +935,7 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. -internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag, +internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, @@ -845,6 +947,6 @@ internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag, routing_key = RoutingKey}, ok = case Notify of true -> rabbit_writer:send_command_and_notify( - WriterPid, QPid, ChPid, M, Content); + WriterPid, QPid, self(), M, Content); false -> rabbit_writer:send_command(WriterPid, M, Content) end. diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index dc5824f1..76016a8c 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -74,7 +74,11 @@ publish(_Other, _Format, _Data, _State) -> ok. publish1(RoutingKey, Format, Data, LogExch) -> - {ok, _QueueNames} = rabbit_exchange:simple_publish( - false, false, LogExch, RoutingKey, <<"text/plain">>, - list_to_binary(io_lib:format(Format, Data))), + {ok, _RoutingRes, _DeliveredQPids} = + rabbit_basic:publish( + rabbit_basic:delivery( + false, false, none, + rabbit_basic:message( + LogExch, RoutingKey, <<"text/plain">>, + list_to_binary(io_lib:format(Format, Data))))), ok. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index e0e3113f..3b6338c7 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -37,16 +37,11 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info/1, info/2, info_all/1, info_all/2, simple_publish/6, simple_publish/3, - route/2]). + route/3]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -<<<<<<< /tmp/rabbitmq-server/src/rabbit_exchange.erl --export([delete_bindings_for_queue/1]). --export([check_type/1, assert_type/2, topic_matches/2]). -======= -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). -export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]). ->>>>>>> /tmp/rabbit_exchange.erl~other.4_e6ym %% EXTENDED API -export([list_exchange_bindings/1]). @@ -84,7 +79,7 @@ (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> publish_res()). -spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). --spec(route/2 :: (exchange(), routing_key()) -> [pid()]). +-spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]). -spec(add_binding/4 :: (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'durability_settings_incompatible'}). @@ -96,6 +91,7 @@ -spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok'). -spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok'). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). +-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()). -spec(delete/2 :: (exchange_name(), bool()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> @@ -110,24 +106,6 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, arguments]. recover() -> -<<<<<<< /tmp/rabbitmq-server/src/rabbit_exchange.erl - rabbit_misc:execute_mnesia_transaction( - fun () -> - mnesia:foldl( - fun (Exchange, Acc) -> - ok = mnesia:write(Exchange), - Acc - end, ok, durable_exchanges), - mnesia:foldl( - fun (Route, Acc) -> - {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(Route), - ok = mnesia:write(ReverseRoute), - Acc - end, ok, durable_routes), - ok - end). -======= ok = rabbit_misc:table_foreach( fun(Exchange) -> ok = mnesia:write(rabbit_exchange, Exchange, write) @@ -139,7 +117,6 @@ recover() -> ok = mnesia:write(rabbit_reverse_route, ReverseRoute, write) end, rabbit_durable_route). ->>>>>>> /tmp/rabbit_exchange.erl~other.4_e6ym declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange = #exchange{name = ExchangeName, @@ -149,11 +126,11 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> arguments = Args}, rabbit_misc:execute_mnesia_transaction( fun () -> - case mnesia:wread({exchange, ExchangeName}) of - [] -> ok = mnesia:write(Exchange), + case mnesia:wread({rabbit_exchange, ExchangeName}) of + [] -> ok = mnesia:write(rabbit_exchange, Exchange, write), if Durable -> - ok = mnesia:write( - durable_exchanges, Exchange, write); + ok = mnesia:write(rabbit_durable_exchange, + Exchange, write); true -> ok end, Exchange; @@ -167,6 +144,8 @@ check_type(<<"direct">>) -> direct; check_type(<<"topic">>) -> topic; +check_type(<<"headers">>) -> + headers; check_type(T) -> rabbit_misc:protocol_error( command_invalid, "invalid exchange type '~s'", [T]). @@ -180,7 +159,7 @@ assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) -> [rabbit_misc:rs(Name), ActualType, RequiredType]). lookup(Name) -> - rabbit_misc:dirty_read({exchange, Name}). + rabbit_misc:dirty_read({rabbit_exchange, Name}). lookup_or_die(Name) -> case lookup(Name) of @@ -192,6 +171,7 @@ lookup_or_die(Name) -> list(VHostPath) -> mnesia:dirty_match_object( + rabbit_exchange, #exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}). map(VHostPath, F) -> @@ -233,64 +213,80 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, %% Usable by Erlang code that wants to publish messages. simple_publish(Mandatory, Immediate, Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey}) -> + routing_key = RoutingKey, + content = Content}) -> case lookup(ExchangeName) of {ok, Exchange} -> - QPids = route(Exchange, RoutingKey), + QPids = route(Exchange, RoutingKey, Content), rabbit_router:deliver(QPids, Mandatory, Immediate, none, Message); {error, Error} -> {error, Error} end. +sort_arguments(Arguments) -> + lists:keysort(1, Arguments). + %% return the list of qpids to which a message with a given routing %% key, sent to a particular exchange, should be delivered. %% %% The function ensures that a qpid appears in the return list exactly %% as many times as a message should be delivered to it. With the %% current exchange types that is at most once. -%% +route(X = #exchange{type = topic}, RoutingKey, _Content) -> + match_bindings(X, fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RoutingKey) + end); + +route(X = #exchange{type = headers}, _RoutingKey, Content) -> + Headers = case (Content#content.properties)#'P_basic'.headers of + undefined -> []; + H -> sort_arguments(H) + end, + match_bindings(X, fun (#binding{args = Spec}) -> + headers_match(Spec, Headers) + end); + +route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> + match_routing_key(X, '_'); + +route(X = #exchange{type = direct}, RoutingKey, _Content) -> + match_routing_key(X, RoutingKey). + %% TODO: Maybe this should be handled by a cursor instead. -route(#exchange{name = Name, type = topic}, RoutingKey) -> - Query = qlc:q([QName || - #route{binding = #binding{ - exchange_name = ExchangeName, - queue_name = QName, - key = BindingKey}} <- mnesia:table(route), - ExchangeName == Name, - %% TODO: This causes a full scan for each entry - %% with the same exchange (see bug 19336) - topic_matches(BindingKey, RoutingKey)]), +%% TODO: This causes a full scan for each entry with the same exchange +match_bindings(#exchange{name = Name}, Match) -> + Query = qlc:q([QName || #route{binding = Binding = #binding{ + exchange_name = ExchangeName, + queue_name = QName}} <- + mnesia:table(rabbit_route), + ExchangeName == Name, + Match(Binding)]), lookup_qpids( try mnesia:async_dirty(fun qlc:e/1, [Query]) catch exit:{aborted, {badarg, _}} -> %% work around OTP-7025, which was fixed in R12B-1, by %% falling back on a less efficient method - [QName || #route{binding = #binding{queue_name = QName, - key = BindingKey}} <- + [QName || #route{binding = Binding = #binding{ + queue_name = QName}} <- mnesia:dirty_match_object( + rabbit_route, #route{binding = #binding{exchange_name = Name, _ = '_'}}), - topic_matches(BindingKey, RoutingKey)] - end); - -route(X = #exchange{type = fanout}, _) -> - route_internal(X, '_'); - -route(X = #exchange{type = direct}, RoutingKey) -> - route_internal(X, RoutingKey). + Match(Binding)] + end). -route_internal(#exchange{name = Name}, RoutingKey) -> +match_routing_key(#exchange{name = Name}, RoutingKey) -> MatchHead = #route{binding = #binding{exchange_name = Name, queue_name = '$1', key = RoutingKey, _ = '_'}}, - lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])). + lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])). lookup_qpids(Queues) -> sets:fold( fun(Key, Acc) -> - case mnesia:dirty_read({amqqueue, Key}) of + case mnesia:dirty_read({rabbit_queue, Key}) of [#amqqueue{pid = QPid}] -> [QPid | Acc]; [] -> Acc end @@ -300,13 +296,6 @@ lookup_qpids(Queues) -> %% refactored to its own module, especially seeing as unbind will have %% to be implemented for 0.91 ? -<<<<<<< /tmp/rabbitmq-server/src/rabbit_exchange.erl -delete_bindings_for_exchange(ExchangeName) -> - indexed_delete( - #route{binding = #binding{exchange_name = ExchangeName, - _ = '_'}}, - fun delete_forward_routes/1, fun mnesia:delete_object/1). -======= delete_exchange_bindings(ExchangeName) -> [begin ok = mnesia:delete_object(rabbit_reverse_route, @@ -318,7 +307,6 @@ delete_exchange_bindings(ExchangeName) -> _ = '_'}}, write)], ok. ->>>>>>> /tmp/rabbit_exchange.erl~other.4_e6ym delete_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, fun delete_forward_routes/1). @@ -328,14 +316,7 @@ delete_transient_queue_bindings(QueueName) -> delete_queue_bindings(QueueName, FwdDeleteFun) -> Exchanges = exchanges_for_queue(QueueName), - indexed_delete( - reverse_route(#route{binding = #binding{queue_name = QueueName, - _ = '_'}}), - fun mnesia:delete_object/1, fun delete_forward_routes/1), [begin -<<<<<<< /tmp/rabbitmq-server/src/rabbit_exchange.erl - [X] = mnesia:read({exchange, ExchangeName}), -======= ok = FwdDeleteFun(reverse_route(Route)), ok = mnesia:delete_object(rabbit_reverse_route, Route, write) end || Route <- mnesia:match_object( @@ -346,21 +327,13 @@ delete_queue_bindings(QueueName, FwdDeleteFun) -> write)], [begin [X] = mnesia:read({rabbit_exchange, ExchangeName}), ->>>>>>> /tmp/rabbit_exchange.erl~other.4_e6ym ok = maybe_auto_delete(X) end || ExchangeName <- Exchanges], ok. -indexed_delete(Match, ForwardsDeleteFun, ReverseDeleteFun) -> - [begin - ok = ReverseDeleteFun(reverse_route(Route)), - ok = ForwardsDeleteFun(Route) - end || Route <- mnesia:match_object(Match)], - ok. - delete_forward_routes(Route) -> - ok = mnesia:delete_object(Route), - ok = mnesia:delete_object(durable_routes, Route, write). + ok = mnesia:delete_object(rabbit_route, Route, write), + ok = mnesia:delete_object(rabbit_durable_route, Route, write). delete_transient_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write). @@ -372,17 +345,18 @@ exchanges_for_queue(QueueName) -> _ = '_'}}), sets:to_list( sets:from_list( - mnesia:select(reverse_route, [{MatchHead, [], ['$1']}]))). + mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))). has_bindings(ExchangeName) -> MatchHead = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, try - continue(mnesia:select(route, [{MatchHead, [], ['$_']}], 1, read)) + continue(mnesia:select(rabbit_route, [{MatchHead, [], ['$_']}], + 1, read)) catch exit:{aborted, {badarg, _}} -> %% work around OTP-7025, which was fixed in R12B-1, by %% falling back on a less efficient method - case mnesia:match_object(MatchHead) of + case mnesia:match_object(rabbit_route, MatchHead, read) of [] -> false; [_|_] -> true end @@ -394,26 +368,13 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)). call_with_exchange(Exchange, Fun) -> rabbit_misc:execute_mnesia_transaction( -<<<<<<< /tmp/rabbitmq-server/src/rabbit_exchange.erl - fun() -> case mnesia:read({exchange, Exchange}) of - [] -> {error, exchange_not_found}; -======= fun() -> case mnesia:read({rabbit_exchange, Exchange}) of [] -> {error, not_found}; ->>>>>>> /tmp/rabbit_exchange.erl~other.4_e6ym [X] -> Fun(X) end end). call_with_exchange_and_queue(Exchange, Queue, Fun) -> -<<<<<<< /tmp/rabbitmq-server/src/rabbit_exchange.erl - call_with_exchange( - Exchange, - fun(X) -> case mnesia:read({amqqueue, Queue}) of - [] -> {error, queue_not_found}; - [Q] -> Fun(X, Q) - end -======= rabbit_misc:execute_mnesia_transaction( fun() -> case {mnesia:read({rabbit_exchange, Exchange}), mnesia:read({rabbit_queue, Queue})} of @@ -422,7 +383,6 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) -> {[_], [ ]} -> {error, queue_not_found}; {[ ], [ ]} -> {error, exchange_and_queue_not_found} end ->>>>>>> /tmp/rabbit_exchange.erl~other.4_e6ym end). add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> @@ -451,13 +411,15 @@ sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) -> Binding = #binding{exchange_name = ExchangeName, queue_name = QueueName, key = RoutingKey, - args = Arguments}, + args = sort_arguments(Arguments)}, ok = case Durable of - true -> Fun(durable_routes, #route{binding = Binding}, write); + true -> Fun(rabbit_durable_route, + #route{binding = Binding}, write); false -> ok end, - [ok, ok] = [Fun(element(1, R), R, write) || - R <- tuple_to_list(route_with_reverse(Binding))], + {Route, ReverseRoute} = route_with_reverse(Binding), + ok = Fun(rabbit_route, Route, write), + ok = Fun(rabbit_reverse_route, ReverseRoute, write), ok. list_bindings(VHostPath) -> @@ -468,6 +430,7 @@ list_bindings(VHostPath) -> queue_name = QueueName, args = Arguments}} <- mnesia:dirty_match_object( + rabbit_route, #route{binding = #binding{ exchange_name = rabbit_misc:r(VHostPath, exchange), _ = '_'}, @@ -503,6 +466,67 @@ reverse_binding(#binding{exchange_name = Exchange, key = Key, args = Args}. +default_headers_match_kind() -> all. + +parse_x_match(<<"all">>) -> all; +parse_x_match(<<"any">>) -> any; +parse_x_match(Other) -> + rabbit_log:warning("Invalid x-match field value ~p; expected all or any", + [Other]), + default_headers_match_kind(). + +%% Horrendous matching algorithm. Depends for its merge-like +%% (linear-time) behaviour on the lists:keysort (sort_arguments) that +%% route/3 and sync_binding/6 do. +%% +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% +headers_match(Pattern, Data) -> + MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of + {value, {_, longstr, MK}} -> parse_x_match(MK); + {value, {_, Type, MK}} -> + rabbit_log:warning("Invalid x-match field type ~p " + "(value ~p); expected longstr", + [Type, MK]), + default_headers_match_kind(); + _ -> default_headers_match_kind() + end, + headers_match(Pattern, Data, true, false, MatchKind). + +headers_match([], _Data, AllMatch, _AnyMatch, all) -> + AllMatch; +headers_match([], _Data, _AllMatch, AnyMatch, any) -> + AnyMatch; +headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data, + AllMatch, AnyMatch, MatchKind) -> + headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind); +headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> + headers_match([], [], false, AnyMatch, MatchKind); +headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK > DK -> + headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind); +headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], + _AllMatch, AnyMatch, MatchKind) when PK < DK -> + headers_match(PRest, Data, false, AnyMatch, MatchKind); +headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK == DK -> + {AllMatch1, AnyMatch1} = + if + %% It's not properly specified, but a "no value" in a + %% pattern field is supposed to mean simple presence of + %% the corresponding data field. I've interpreted that to + %% mean a type of "void" for the pattern field. + PT == void -> {AllMatch, true}; + %% Similarly, it's not specified, but I assume that a + %% mismatched type causes a mismatched value. + PT =/= DT -> {false, AnyMatch}; + PV == DV -> {AllMatch, true}; + true -> {false, AnyMatch} + end, + headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). + split_topic_key(Key) -> {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), KeySplit. @@ -548,15 +572,9 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) -> end. unconditional_delete(#exchange{name = ExchangeName}) -> -<<<<<<< /tmp/rabbitmq-server/src/rabbit_exchange.erl - ok = delete_bindings_for_exchange(ExchangeName), - ok = mnesia:delete({durable_exchanges, ExchangeName}), - ok = mnesia:delete({exchange, ExchangeName}). -======= ok = delete_exchange_bindings(ExchangeName), ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}), ok = mnesia:delete({rabbit_exchange, ExchangeName}). ->>>>>>> /tmp/rabbit_exchange.erl~other.4_e6ym %%---------------------------------------------------------------------------- %% EXTENDED API @@ -572,7 +590,7 @@ list_exchange_bindings(ExchangeName) -> #route{binding = #binding{queue_name = QueueName, key = RoutingKey, args = Arguments}} - <- mnesia:dirty_match_object(Route)]. + <- mnesia:dirty_match_object(rabbit_route, Route)]. % Refactoring is left as an exercise for the reader list_queue_bindings(QueueName) -> @@ -582,4 +600,4 @@ list_queue_bindings(QueueName) -> #route{binding = #binding{exchange_name = ExchangeName, key = RoutingKey, args = Arguments}} - <- mnesia:dirty_match_object(Route)]. + <- mnesia:dirty_match_object(rabbit_route, Route)]. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 3f9b6ebb..9f3dcbd0 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -36,7 +36,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([start_link/1, shutdown/1]). --export([limit/2, can_send/2, ack/2, register/2, unregister/2]). +-export([limit/2, can_send/3, ack/2, register/2, unregister/2]). %%---------------------------------------------------------------------------- @@ -47,7 +47,7 @@ -spec(start_link/1 :: (pid()) -> pid()). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). --spec(can_send/2 :: (maybe_pid(), pid()) -> bool()). +-spec(can_send/3 :: (maybe_pid(), pid(), bool()) -> bool()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). @@ -85,12 +85,13 @@ limit(LimiterPid, PrefetchCount) -> %% Ask the limiter whether the queue can deliver a message without %% breaching a limit -can_send(undefined, _QPid) -> +can_send(undefined, _QPid, _AckRequired) -> true; -can_send(LimiterPid, QPid) -> +can_send(LimiterPid, QPid, AckRequired) -> rabbit_misc:with_exit_handler( fun () -> true end, - fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end). + fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired}, + infinity) end). %% Let the limiter know that the channel has received some acks from a %% consumer @@ -110,10 +111,13 @@ unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}) init([ChPid]) -> {ok, #lim{ch_pid = ChPid} }. -handle_call({can_send, QPid}, _From, State = #lim{volume = Volume}) -> +handle_call({can_send, QPid, AckRequired}, _From, + State = #lim{volume = Volume}) -> case limit_reached(State) of true -> {reply, false, limit_queue(QPid, State)}; - false -> {reply, true, State#lim{volume = Volume + 1}} + false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; + true -> Volume + end}} end. handle_cast(shutdown, State) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 9fc4ff71..4da247a4 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -36,9 +36,10 @@ -export([method_record_type/1, polite_pause/0, polite_pause/1]). -export([die/1, frame_error/2, protocol_error/3, protocol_error/4]). +-export([not_found/1]). -export([get_config/1, get_config/2, set_config/2]). -export([dirty_read/1]). --export([r/3, r/2, rs/1]). +-export([r/3, r/2, r_arg/4, rs/1]). -export([enable_cover/0, report_cover/0]). -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). @@ -71,16 +72,19 @@ (atom() | amqp_error(), string(), [any()]) -> no_return()). -spec(protocol_error/4 :: (atom() | amqp_error(), string(), [any()], atom()) -> no_return()). +-spec(not_found/1 :: (r(atom())) -> no_return()). -spec(get_config/1 :: (atom()) -> {'ok', any()} | not_found()). -spec(get_config/2 :: (atom(), A) -> A). -spec(set_config/2 :: (atom(), any()) -> 'ok'). -spec(dirty_read/1 :: ({atom(), any()}) -> {'ok', any()} | not_found()). --spec(r/3 :: (vhost() | r(atom()), K, resource_name()) -> r(K) - when is_subtype(K, atom())). +-spec(r/3 :: (vhost() | r(atom()), K, resource_name()) -> + r(K) when is_subtype(K, atom())). -spec(r/2 :: (vhost(), K) -> #resource{virtual_host :: vhost(), kind :: K, name :: '_'} when is_subtype(K, atom())). +-spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) -> + undefined | r(K) when is_subtype(K, atom())). -spec(rs/1 :: (r(atom())) -> string()). -spec(enable_cover/0 :: () -> 'ok' | {'error', any()}). -spec(report_cover/0 :: () -> 'ok'). @@ -142,6 +146,8 @@ protocol_error(Error, Explanation, Params, Method) -> CompleteExplanation = lists:flatten(io_lib:format(Explanation, Params)), exit({amqp, Error, CompleteExplanation, Method}). +not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]). + get_config(Key) -> case dirty_read({rabbit_config, Key}) of {ok, {rabbit_config, Key, V}} -> {ok, V}; @@ -172,6 +178,14 @@ r(VHostPath, Kind, Name) when is_binary(Name) andalso is_binary(VHostPath) -> r(VHostPath, Kind) when is_binary(VHostPath) -> #resource{virtual_host = VHostPath, kind = Kind, name = '_'}. +r_arg(#resource{virtual_host = VHostPath}, Kind, Table, Key) -> + r_arg(VHostPath, Kind, Table, Key); +r_arg(VHostPath, Kind, Table, Key) -> + case lists:keysearch(Key, 1, Table) of + {value, {_, longstr, NameBin}} -> r(VHostPath, Kind, NameBin); + false -> undefined + end. + rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> lists:flatten(io_lib:format("~s '~s' in vhost '~s'", [Kind, Name, VHostPath])). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index d19c37cb..0c573073 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -31,7 +31,7 @@ -module(rabbit_mnesia). --export([ensure_mnesia_dir/0, status/0, init/0, is_db_empty/0, +-export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, cluster/1, reset/0, force_reset/0]). -export([table_names/0]). @@ -47,6 +47,7 @@ -ifdef(use_specs). -spec(status/0 :: () -> [{'nodes' | 'running_nodes', [erlang_node()]}]). +-spec(dir/0 :: () -> string()). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). -spec(is_db_empty/0 :: () -> bool()). @@ -131,8 +132,10 @@ table_definitions() -> table_names() -> [Tab || {Tab, _} <- table_definitions()]. +dir() -> mnesia:system_info(directory). + ensure_mnesia_dir() -> - MnesiaDir = mnesia:system_info(directory) ++ "/", + MnesiaDir = dir() ++ "/", case filelib:ensure_dir(MnesiaDir) of {error, Reason} -> throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}}); @@ -168,7 +171,7 @@ check_schema_integrity() -> %% it doesn't. cluster_nodes_config_filename() -> - mnesia:system_info(directory) ++ "/cluster_nodes.config". + dir() ++ "/cluster_nodes.config". create_cluster_nodes_config(ClusterNodes) -> FileName = cluster_nodes_config_filename(), @@ -284,7 +287,7 @@ create_schema() -> move_db() -> mnesia:stop(), - MnesiaDir = filename:dirname(mnesia:system_info(directory) ++ "/"), + MnesiaDir = filename:dirname(dir() ++ "/"), {{Year, Month, Day}, {Hour, Minute, Second}} = erlang:universaltime(), BackupDir = lists:flatten( io_lib:format("~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w", @@ -401,7 +404,7 @@ reset(Force) -> ok = delete_cluster_nodes_config(), %% remove persistet messages and any other garbage we find lists:foreach(fun file:delete/1, - filelib:wildcard(mnesia:system_info(directory) ++ "/*")), + filelib:wildcard(dir() ++ "/*")), ok. leave_cluster([], _) -> ok; diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl index f4fa4599..d0d60ddf 100644 --- a/src/rabbit_persister.erl +++ b/src/rabbit_persister.erl @@ -259,7 +259,7 @@ log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs}, pending_logs = [Message | Logs]}. base_filename() -> - mnesia:system_info(directory) ++ "/rabbit_persister.LOG". + rabbit_mnesia:dir() ++ "/rabbit_persister.LOG". take_snapshot(LogHandle, OldFileName, Snapshot) -> ok = disk_log:sync(LogHandle), diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 988eb651..985ca3e2 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -610,9 +610,9 @@ handle_method0(#'connection.open'{virtual_host = VHostPath, insist = Insist}, State = #v1{connection_state = opening, connection = Connection = #connection{ - user = User}, + user = #user{username = Username}}, sock = Sock}) -> - ok = rabbit_access_control:check_vhost_access(User, VHostPath), + ok = rabbit_access_control:check_vhost_access(Username, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, KnownHosts = format_listeners(rabbit_networking:active_listeners()), Redirects = compute_redirects(Insist), diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 624822d9..0b06a063 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -32,7 +32,7 @@ -module(rabbit_router). -include("rabbit.hrl"). --behaviour(gen_server). +-behaviour(gen_server2). -export([start_link/0, deliver/5]). @@ -58,7 +58,7 @@ %%---------------------------------------------------------------------------- start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). -ifdef(BUG19758). @@ -100,7 +100,7 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false, %% than the non-immediate case below. {ok, lists:flatmap( fun ({Node, QPids}) -> - gen_server:cast( + gen_server2:cast( {?SERVER, Node}, {deliver, QPids, Mandatory, Immediate, Txn, Message}), QPids @@ -110,7 +110,7 @@ deliver_per_node(NodeQPids, Mandatory, Immediate, Txn, Message) -> R = rabbit_misc:upmap( fun ({Node, QPids}) -> - try gen_server:call( + try gen_server2:call( {?SERVER, Node}, {deliver, QPids, Mandatory, Immediate, Txn, Message}, infinity) @@ -144,7 +144,7 @@ handle_call({deliver, QPids, Mandatory, Immediate, Txn, Message}, spawn( fun () -> R = run_bindings(QPids, Mandatory, Immediate, Txn, Message), - gen_server:reply(From, R) + gen_server2:reply(From, R) end), {noreply, State}. |