summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.hgignore1
-rw-r--r--Makefile23
-rw-r--r--docs/rabbitmqctl.1.pod63
-rw-r--r--ebin/rabbit.app57
-rw-r--r--ebin/rabbit_app.in21
-rw-r--r--generate_app10
-rw-r--r--include/rabbit.hrl7
-rw-r--r--packaging/macports/net/rabbitmq-server/Portfile115
-rw-r--r--packaging/macports/net/rabbitmq-server/files/patch-org.macports.rabbitmq-server.plist.diff10
-rw-r--r--packaging/macports/net/rabbitmq-server/files/rabbitmq-defaults7
-rw-r--r--packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper13
-rw-r--r--src/buffering_proxy.erl108
-rw-r--r--src/gen_server2.erl898
-rw-r--r--src/priority_queue.erl153
-rw-r--r--src/rabbit.erl42
-rw-r--r--src/rabbit_access_control.erl189
-rw-r--r--src/rabbit_alarm.erl11
-rw-r--r--src/rabbit_amqqueue.erl96
-rw-r--r--src/rabbit_amqqueue_process.erl178
-rw-r--r--src/rabbit_channel.erl310
-rw-r--r--src/rabbit_control.erl93
-rw-r--r--src/rabbit_error_logger_file_h.erl2
-rw-r--r--src/rabbit_exchange.erl234
-rw-r--r--src/rabbit_framing_channel.erl6
-rw-r--r--src/rabbit_guid.erl3
-rw-r--r--src/rabbit_limiter.erl195
-rw-r--r--src/rabbit_misc.erl36
-rw-r--r--src/rabbit_mnesia.erl75
-rw-r--r--src/rabbit_networking.erl8
-rw-r--r--src/rabbit_persister.erl54
-rw-r--r--src/rabbit_reader.erl32
-rw-r--r--src/rabbit_router.erl13
-rw-r--r--src/rabbit_sasl_report_file_h.erl2
-rw-r--r--src/rabbit_tests.erl88
34 files changed, 2376 insertions, 777 deletions
diff --git a/.hgignore b/.hgignore
index 28f9cfd8..35607765 100644
--- a/.hgignore
+++ b/.hgignore
@@ -9,6 +9,7 @@ syntax: regexp
^include/rabbit_framing.hrl$
^src/rabbit_framing.erl$
^rabbit.plt$
+^ebin/rabbit.app$
^packaging/RPMS/Fedora/(BUILD|RPMS|SOURCES|SPECS|SRPMS)$
^packaging/debs/Debian/rabbitmq-server_.*\.(dsc|(diff|tar)\.gz|deb|changes)$
diff --git a/Makefile b/Makefile
index 64e008c1..4ff8573a 100644
--- a/Makefile
+++ b/Makefile
@@ -7,7 +7,8 @@ SOURCE_DIR=src
EBIN_DIR=ebin
INCLUDE_DIR=include
SOURCES=$(wildcard $(SOURCE_DIR)/*.erl)
-TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES))
+BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES))
+TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS)
WEB_URL=http://stage.rabbitmq.com/
MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod))
@@ -39,9 +40,15 @@ ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
#all: $(EBIN_DIR)/rabbit.boot
all: $(TARGETS)
-$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl
+$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
+ escript generate_app $(EBIN_DIR) < $< > $@
+
+$(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl
erlc $(ERLC_OPTS) $<
-# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) $<
+
+$(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit.hrl $(EBIN_DIR)/gen_server2.beam
+ erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
+# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH)
$(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@
@@ -52,12 +59,12 @@ $(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py
$(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script: $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.rel $(TARGETS)
erl -noshell -eval 'systools:make_script("ebin/rabbit", [{path, ["ebin"]}]), halt().'
-dialyze: $(TARGETS)
+dialyze: $(BEAM_TARGETS)
dialyzer -c $?
clean: cleandb
rm -f $(EBIN_DIR)/*.beam
- rm -f $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script
+ rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script
rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc
rm -f docs/*.[0-9].gz
@@ -121,11 +128,11 @@ srcdist: distclean
>> $(TARGET_SRC_DIR)/INSTALL
cp README.in $(TARGET_SRC_DIR)/README
elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \
- >> $(TARGET_SRC_DIR)/README
- sed -i.save 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit.app && rm -f $(TARGET_SRC_DIR)/ebin/rabbit.app.save
+ >> $(TARGET_SRC_DIR)/BUILD
+ sed -i.save 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save
cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/
- cp codegen.py Makefile $(TARGET_SRC_DIR)
+ cp codegen.py Makefile generate_app $(TARGET_SRC_DIR)
cp -r scripts $(TARGET_SRC_DIR)
cp -r docs $(TARGET_SRC_DIR)
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod
index 17dd8338..a0232a40 100644
--- a/docs/rabbitmqctl.1.pod
+++ b/docs/rabbitmqctl.1.pod
@@ -26,7 +26,7 @@ B<-n> I<node>
startup time). The output of hostname -s is usually the correct
suffix to use after the "@" sign. See rabbitmq-server(1) for
details of configuring the RabbitMQ broker.
-
+
B<-q>
quiet output mode is selected with the B<-q> flag. Informational
messages are suppressed when quiet mode is in effect.
@@ -43,32 +43,32 @@ stop_app
This command is typically run prior to performing other management
actions that require the RabbitMQ application to be stopped,
e.g. I<reset>.
-
+
start_app
start the RabbitMQ application.
This command is typically run prior to performing other management
actions that require the RabbitMQ application to be stopped,
e.g. I<reset>.
-
+
status
display various information about the RabbitMQ broker, such as
whether the RabbitMQ application on the current node, its version
number, what nodes are part of the broker, which of these are
running.
-
+
force
return a RabbitMQ node to its virgin state.
Removes the node from any cluster it belongs to, removes all data
from the management database, such as configured users, vhosts and
deletes all persistent messages.
-
+
force_reset
the same as I<force> command, but resets the node unconditionally,
regardless of the current management database state and cluster
configuration.
It should only be used as a last resort if the database or cluster
configuration has been corrupted.
-
+
rotate_logs [suffix]
instruct the RabbitMQ node to rotate the log files. The RabbitMQ
broker will attempt to append the current contents of the log file
@@ -81,29 +81,32 @@ rotate_logs [suffix]
specified.
This command might be helpful when you are e.g. writing your own
logrotate script and you do not want to restart the RabbitMQ node.
-
+
cluster I<clusternode> ...
instruct the node to become member of a cluster with the specified
nodes determined by I<clusternode> option(s).
See http://www.rabbitmq.com/clustering.html for more information
about clustering.
-
+
=head2 USER MANAGEMENT
-
+
add_user I<username> I<password>
create a user named I<username> with (initial) password I<password>.
-
+
+delete_user I<username>
+ delete the user named I<username>.
+
change_password I<username> I<newpassword>
change the password for the user named I<username> to I<newpassword>.
list_users
list all users.
-
+
=head2 ACCESS CONTROL
add_vhost I<vhostpath>
create a new virtual host called I<vhostpath>.
-
+
delete_vhost I<vhostpath>
delete a virtual host I<vhostpath>.
That command deletes also all its exchanges, queues and user
@@ -111,19 +114,25 @@ delete_vhost I<vhostpath>
list_vhosts
list all virtual hosts.
-
-map_user_vhost I<username> I<vhostpath>
- grant the user named I<username> access to the virtual host called
- I<vhostpath>.
-
-unmap_user_vhost I<username> I<vhostpath>
- deny the user named I<username> access to the virtual host called
+
+set_permissions [-p I<vhostpath>] I<username> I<regexp> I<regexp> I<regexp>
+ set the permissions for the user named I<username> in the virtual
+ host I<vhostpath>, granting 'configure', 'write' and 'read' access
+ to resources with names matching the first, second and third
+ I<regexp>, respectively.
+
+clear_permissions [-p I<vhostpath>] I<username>
+ remove the permissions for the user named I<username> in the
+ virtual host I<vhostpath>.
+
+list_permissions [-p I<vhostpath>]
+ list all the users and their permissions in the virtual host
I<vhostpath>.
-list_user_vhost I<username>
- list all the virtual hosts to which the user named I<username> has
- been granted access.
-
+list_user_permissions I<username>
+ list the permissions of the user named I<username> across all
+ virtual hosts.
+
=head2 SERVER STATUS
list_queues [-p I<vhostpath>] [I<queueinfoitem> ...]
@@ -231,7 +240,7 @@ peer_address
peer_port
peer port
-
+
state
connection state (B<pre-init>, B<starting>, B<tuning>, B<opening>,
B<running>, B<closing>, B<closed>)
@@ -265,7 +274,7 @@ send_cnt
send_pend
send queue size
-
+
=back
The list_queues, list_exchanges and list_bindings commands accept an
@@ -279,12 +288,12 @@ Create a user named foo with (initial) password bar at the Erlang node
rabbit@test:
rabbitmqctl -n rabbit@test add_user foo bar
-
+
Grant user named foo access to the virtual host called test at the
default Erlang node:
rabbitmqctl map_user_vhost foo test
-
+
Append the current logs' content to the files with ".1" suffix and reopen
them:
diff --git a/ebin/rabbit.app b/ebin/rabbit.app
deleted file mode 100644
index 0d714fdf..00000000
--- a/ebin/rabbit.app
+++ /dev/null
@@ -1,57 +0,0 @@
-{application, rabbit, %% -*- erlang -*-
- [{description, "RabbitMQ"},
- {id, "RabbitMQ"},
- {vsn, "%%VERSION%%"},
- {modules, [buffering_proxy,
- rabbit_access_control,
- rabbit_alarm,
- rabbit_amqqueue,
- rabbit_amqqueue_process,
- rabbit_amqqueue_sup,
- rabbit_binary_generator,
- rabbit_binary_parser,
- rabbit_channel,
- rabbit_control,
- rabbit,
- rabbit_error_logger,
- rabbit_error_logger_file_h,
- rabbit_exchange,
- rabbit_framing_channel,
- rabbit_framing,
- rabbit_heartbeat,
- rabbit_load,
- rabbit_log,
- rabbit_memsup_linux,
- rabbit_misc,
- rabbit_mnesia,
- rabbit_multi,
- rabbit_networking,
- rabbit_node_monitor,
- rabbit_persister,
- rabbit_reader,
- rabbit_router,
- rabbit_sasl_report_file_h,
- rabbit_sup,
- rabbit_tests,
- rabbit_tracer,
- rabbit_writer,
- tcp_acceptor,
- tcp_acceptor_sup,
- tcp_client_sup,
- tcp_listener,
- tcp_listener_sup]},
- {registered, [rabbit_amqqueue_sup,
- rabbit_log,
- rabbit_node_monitor,
- rabbit_persister,
- rabbit_router,
- rabbit_sup,
- rabbit_tcp_client_sup]},
- {applications, [kernel, stdlib, sasl, mnesia, os_mon]},
- {mod, {rabbit, []}},
- {env, [{tcp_listeners, [{"0.0.0.0", 5672}]},
- {extra_startup_steps, []},
- {default_user, <<"guest">>},
- {default_pass, <<"guest">>},
- {default_vhost, <<"/">>},
- {memory_alarms, auto}]}]}.
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
new file mode 100644
index 00000000..5be07492
--- /dev/null
+++ b/ebin/rabbit_app.in
@@ -0,0 +1,21 @@
+{application, rabbit, %% -*- erlang -*-
+ [{description, "RabbitMQ"},
+ {id, "RabbitMQ"},
+ {vsn, "%%VERSION%%"},
+ {modules, []},
+ {registered, [rabbit_amqqueue_sup,
+ rabbit_log,
+ rabbit_node_monitor,
+ rabbit_persister,
+ rabbit_router,
+ rabbit_sup,
+ rabbit_tcp_client_sup]},
+ {applications, [kernel, stdlib, sasl, mnesia, os_mon]},
+ {mod, {rabbit, []}},
+ {env, [{tcp_listeners, [{"0.0.0.0", 5672}]},
+ {extra_startup_steps, []},
+ {default_user, <<"guest">>},
+ {default_pass, <<"guest">>},
+ {default_vhost, <<"/">>},
+ {default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
+ {memory_alarms, auto}]}]}.
diff --git a/generate_app b/generate_app
new file mode 100644
index 00000000..62301292
--- /dev/null
+++ b/generate_app
@@ -0,0 +1,10 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+
+main([BeamDir]) ->
+ Modules = [list_to_atom(filename:basename(F, ".beam")) ||
+ F <- filelib:wildcard("*.beam", BeamDir)],
+ {ok, {application, Application, Properties}} = io:read(''),
+ NewProperties = lists:keyreplace(modules, 1, Properties,
+ {modules, Modules}),
+ io:format("~p.", [{application, Application, NewProperties}]).
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index d07aeaf8..c707112f 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -30,7 +30,9 @@
%%
-record(user, {username, password}).
+-record(permission, {configure, write, read}).
-record(user_vhost, {username, virtual_host}).
+-record(user_permission, {user_vhost, permission}).
-record(vhost, {virtual_host, dummy}).
@@ -74,6 +76,7 @@
-type(thunk(T) :: fun(() -> T)).
-type(info_key() :: atom()).
-type(info() :: {info_key(), any()}).
+-type(regexp() :: binary()).
%% this is really an abstract type, but dialyzer does not support them
-type(guid() :: any()).
@@ -88,6 +91,10 @@
-type(user() ::
#user{username :: username(),
password :: password()}).
+-type(permission() ::
+ #permission{configure :: regexp(),
+ write :: regexp(),
+ read :: regexp()}).
-type(amqqueue() ::
#amqqueue{name :: queue_name(),
durable :: bool(),
diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/net/rabbitmq-server/Portfile
index 82ae62aa..d9d16dbb 100644
--- a/packaging/macports/net/rabbitmq-server/Portfile
+++ b/packaging/macports/net/rabbitmq-server/Portfile
@@ -1,79 +1,98 @@
-# $Id$ -*- coding: utf-8; mode: tcl; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- vim:fenc=utf-8:filetype=tcl:et:sw=4:ts=4:sts=4
-
-PortSystem 1.0
-name rabbitmq-server
-version 1.3.0
-revision 0
-categories net
-maintainers tonyg@rabbitmq.com
-platforms darwin
-description The RabbitMQ AMQP Server
-long_description \
+# -*- coding: utf-8; mode: tcl; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- vim:fenc=utf-8:filetype=tcl:et:sw=4:ts=4:sts=4
+# $Id$
+
+PortSystem 1.0
+
+name rabbitmq-server
+version 1.5.3
+categories net
+maintainers tonyg@rabbitmq.com
+platforms darwin
+description The RabbitMQ AMQP Server
+long_description \
RabbitMQ is an implementation of AMQP, the emerging standard for \
high performance enterprise messaging. The RabbitMQ server is a \
robust and scalable implementation of an AMQP broker.
-homepage http://www.rabbitmq.com/
-master_sites http://www.rabbitmq.com/releases/source/
-distname rabbitmq-${version}
+homepage http://www.rabbitmq.com/
+master_sites http://www.rabbitmq.com/releases/rabbitmq-server/v${version}/
checksums \
- md5 46ee6dbbacdc67b25cc6ccd9c394b6f2 \
- sha1 67e1e640136a1993567ace97dc5f67b1ad8e6304 \
- rmd160 9e92502d36ab5cd1e3f0d39a46bb512b9440f35a
+ md5 3242a67885c2471b5ab62254bf024679 \
+ sha1 f4d6a01eaa2c74fa32f567fe410d21d9be1b43aa \
+ rmd160 1a1c4b97d765548028c161d1617905151ca9e040
-depends_build port:erlang
-depends_run port:erlang
+depends_build port:erlang port:py25-simplejson
+depends_run port:erlang
-use_configure no
+set serveruser rabbitmq
+set servergroup rabbitmq
+set serverhome ${prefix}/var/lib/rabbitmq
+set logdir ${prefix}/var/log/rabbitmq
+set mnesiadbdir ${prefix}/var/lib/rabbitmq/mnesia
+set plistloc ${prefix}/etc/LaunchDaemons/org.macports.rabbitmq-server
-worksrcdir rabbitmq-${version}/erlang/rabbit
+use_configure no
use_parallel_build yes
+build.args PYTHON=${prefix}/bin/python2.5
+
destroot.destdir \
- DIST_DIR=${destroot}${prefix}/lib/erlang/lib/rabbitmq_server-${version} \
- SBIN_DIR=${destroot}${prefix}/sbin
-destroot.target dist-unix
+ TARGET_DIR=${destroot}${prefix}/lib/erlang/lib/rabbitmq_server-${version} \
+ SBIN_DIR=${destroot}${prefix}/sbin \
+ MAN_DIR=${destroot}${prefix}/share/man
destroot.keepdirs \
- ${destroot}${prefix}/var/lib/rabbitmq/pids \
- ${destroot}${prefix}/var/log/rabbitmq \
- ${destroot}${prefix}/var/lib/rabbitmq/mnesia
+ ${destroot}${logdir} \
+ ${destroot}${mnesiadbdir}
pre-destroot {
- addgroup rabbitmq
- adduser rabbitmq gid=[existsgroup rabbitmq] realname=RabbitMQ\ Server home=${prefix}/var/lib/rabbitmq
+ addgroup ${servergroup}
+ adduser ${serveruser} gid=[existsgroup ${servergroup}] realname=RabbitMQ\ Server home=${serverhome}
}
post-destroot {
- xinstall -d ${destroot}${prefix}/etc/default
- xinstall -d -g [existsgroup rabbitmq] -m 775 ${destroot}${prefix}/var/log/rabbitmq
- xinstall -d -g [existsgroup rabbitmq] -m 775 ${destroot}${prefix}/var/lib/rabbitmq
- xinstall -d -g [existsgroup rabbitmq] -m 775 ${destroot}${prefix}/var/lib/rabbitmq/pids
- xinstall -d -g [existsgroup rabbitmq] -m 775 ${destroot}${prefix}/var/lib/rabbitmq/mnesia
- file rename ${destroot}${prefix}/sbin/rabbitmqctl ${destroot}${prefix}/sbin/rabbitmqctl_real
- xinstall -m 555 ${filespath}/rabbitmqctl_wrapper ${destroot}${prefix}/sbin
- file rename ${destroot}${prefix}/sbin/rabbitmqctl_wrapper ${destroot}${prefix}/sbin/rabbitmqctl
- file copy ${filespath}/rabbitmq-defaults ${destroot}${prefix}/etc/default/rabbitmq
- reinplace "s:^CLUSTER_CONFIG_FILE=:CLUSTER_CONFIG_FILE=${prefix}:" \
+ xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${logdir}
+ xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${serverhome}
+ xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir}
+
+ reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \
+ ${destroot}${prefix}/sbin/rabbitmq-multi \
+ ${destroot}${prefix}/sbin/rabbitmq-server \
+ ${destroot}${prefix}/sbin/rabbitmqctl
+ reinplace -E "s:(RABBITMQ_CLUSTER_CONFIG_FILE)=/:\\1=${prefix}/:" \
+ ${destroot}${prefix}/sbin/rabbitmq-multi \
+ ${destroot}${prefix}/sbin/rabbitmq-server \
+ ${destroot}${prefix}/sbin/rabbitmqctl
+ reinplace -E "s:(RABBITMQ_LOG_BASE)=/:\\1=${prefix}/:" \
+ ${destroot}${prefix}/sbin/rabbitmq-multi \
+ ${destroot}${prefix}/sbin/rabbitmq-server \
+ ${destroot}${prefix}/sbin/rabbitmqctl
+ reinplace -E "s:(RABBITMQ_MNESIA_BASE)=/:\\1=${prefix}/:" \
${destroot}${prefix}/sbin/rabbitmq-multi \
${destroot}${prefix}/sbin/rabbitmq-server \
- ${destroot}${prefix}/sbin/rabbitmqctl \
- ${destroot}${prefix}/sbin/rabbitmqctl_real
- reinplace "s:^CONFIG_FILE=:CONFIG_FILE=${prefix}:" \
+ ${destroot}${prefix}/sbin/rabbitmqctl
+ reinplace -E "s:(RABBITMQ_PIDS_FILE)=/:\\1=${prefix}/:" \
${destroot}${prefix}/sbin/rabbitmq-multi \
${destroot}${prefix}/sbin/rabbitmq-server \
- ${destroot}${prefix}/sbin/rabbitmqctl \
- ${destroot}${prefix}/sbin/rabbitmqctl_real
- reinplace "s|@PREFIX@|${prefix}|" \
- ${destroot}${prefix}/sbin/rabbitmqctl \
- ${destroot}${prefix}/etc/default/rabbitmq
+ ${destroot}${prefix}/sbin/rabbitmqctl
+
+ file rename ${destroot}${prefix}/sbin/rabbitmqctl ${destroot}${prefix}/sbin/rabbitmqctl_real
+ xinstall -m 555 ${filespath}/rabbitmqctl_wrapper ${destroot}${prefix}/sbin
+ file rename ${destroot}${prefix}/sbin/rabbitmqctl_wrapper ${destroot}${prefix}/sbin/rabbitmqctl
+
+ reinplace -E "s:@PREFIX@:${prefix}:" \
+ ${destroot}${prefix}/sbin/rabbitmqctl
+}
+
+pre-install {
+ system "cd ${destroot}${plistloc}; patch <${filespath}/patch-org.macports.rabbitmq-server.plist.diff"
}
startupitem.create yes
startupitem.init "PATH=${prefix}/bin:${prefix}/sbin:\$PATH; export PATH"
-startupitem.start "su rabbitmq -c rabbitmq-server 2>&1"
+startupitem.start "rabbitmq-server 2>&1"
startupitem.stop "rabbitmqctl stop 2>&1"
startupitem.logfile ${prefix}/var/log/rabbitmq/startupitem.log
diff --git a/packaging/macports/net/rabbitmq-server/files/patch-org.macports.rabbitmq-server.plist.diff b/packaging/macports/net/rabbitmq-server/files/patch-org.macports.rabbitmq-server.plist.diff
new file mode 100644
index 00000000..45b49496
--- /dev/null
+++ b/packaging/macports/net/rabbitmq-server/files/patch-org.macports.rabbitmq-server.plist.diff
@@ -0,0 +1,10 @@
+--- org.macports.rabbitmq-server.plist.old 2009-02-26 08:00:31.000000000 -0800
++++ org.macports.rabbitmq-server.plist 2009-02-26 08:01:27.000000000 -0800
+@@ -22,6 +22,7 @@
+ <string>;</string>
+ <string>--pid=none</string>
+ </array>
++<key>UserName</key><string>rabbitmq</string>
+ <key>Debug</key><false/>
+ <key>Disabled</key><true/>
+ <key>OnDemand</key><false/>
diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-defaults b/packaging/macports/net/rabbitmq-server/files/rabbitmq-defaults
deleted file mode 100644
index 1f9aad11..00000000
--- a/packaging/macports/net/rabbitmq-server/files/rabbitmq-defaults
+++ /dev/null
@@ -1,7 +0,0 @@
-#!/bin/sh
-# defaults file for rabbitmq-server
-#
-
-PIDS_FILE=@PREFIX@/var/lib/rabbitmq/pids
-LOG_BASE=@PREFIX@/var/log/rabbitmq
-MNESIA_BASE=@PREFIX@/var/lib/rabbitmq/mnesia
diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper
index 392c82ff..1996811e 100644
--- a/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper
+++ b/packaging/macports/net/rabbitmq-server/files/rabbitmqctl_wrapper
@@ -1,13 +1,2 @@
#!/bin/bash
-# Escape spaces and quotes, because shell is revolting.
-for arg in "$@" ; do
- # Escape quotes in parameters, so that they're passed through cleanly.
- arg=$(sed -e 's/"/\\"/' <<-END
- $arg
- END
- )
- CMDLINE="${CMDLINE} \"${arg}\""
-done
-
-cd /
-exec su rabbitmq -c "@PREFIX@/sbin/rabbitmqctl_real ${CMDLINE}"
+exec sudo -H -u rabbitmq "@PREFIX@/sbin/rabbitmqctl_real" "$@"
diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl
deleted file mode 100644
index 344b719a..00000000
--- a/src/buffering_proxy.erl
+++ /dev/null
@@ -1,108 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(buffering_proxy).
-
--export([start_link/2]).
-
-%% internal
-
--export([mainloop/4, drain/2]).
--export([proxy_loop/3]).
-
--define(HIBERNATE_AFTER, 5000).
-
-%%----------------------------------------------------------------------------
-
-start_link(M, A) ->
- spawn_link(
- fun () -> process_flag(trap_exit, true),
- ProxyPid = self(),
- Ref = make_ref(),
- Pid = spawn_link(
- fun () -> ProxyPid ! Ref,
- mainloop(ProxyPid, Ref, M,
- M:init(ProxyPid, A)) end),
- proxy_loop(Ref, Pid, empty)
- end).
-
-%%----------------------------------------------------------------------------
-
-mainloop(ProxyPid, Ref, M, State) ->
- NewState =
- receive
- {Ref, Messages} ->
- NewSt =
- lists:foldl(fun (Msg, S) ->
- drain(M, M:handle_message(Msg, S))
- end, State, lists:reverse(Messages)),
- ProxyPid ! Ref,
- NewSt;
- Msg -> M:handle_message(Msg, State)
- after ?HIBERNATE_AFTER ->
- erlang:hibernate(?MODULE, mainloop,
- [ProxyPid, Ref, M, State])
- end,
- ?MODULE:mainloop(ProxyPid, Ref, M, NewState).
-
-drain(M, State) ->
- receive
- Msg -> ?MODULE:drain(M, M:handle_message(Msg, State))
- after 0 ->
- State
- end.
-
-proxy_loop(Ref, Pid, State) ->
- receive
- Ref ->
- ?MODULE:proxy_loop(
- Ref, Pid,
- case State of
- empty -> waiting;
- waiting -> exit(duplicate_next);
- Messages -> Pid ! {Ref, Messages}, empty
- end);
- {'EXIT', Pid, Reason} ->
- exit(Reason);
- {'EXIT', _, Reason} ->
- exit(Pid, Reason),
- ?MODULE:proxy_loop(Ref, Pid, State);
- Msg ->
- ?MODULE:proxy_loop(
- Ref, Pid,
- case State of
- empty -> [Msg];
- waiting -> Pid ! {Ref, [Msg]}, empty;
- Messages -> [Msg | Messages]
- end)
- after ?HIBERNATE_AFTER ->
- erlang:hibernate(?MODULE, proxy_loop, [Ref, Pid, State])
- end.
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
new file mode 100644
index 00000000..ba8becfc
--- /dev/null
+++ b/src/gen_server2.erl
@@ -0,0 +1,898 @@
+%% This file is a copy of gen_server.erl from the R11B-5 Erlang/OTP
+%% distribution, with the following modifications:
+%%
+%% 1) the module name is gen_server2
+%%
+%% 2) more efficient handling of selective receives in callbacks
+%% gen_server2 processes drain their message queue into an internal
+%% buffer before invoking any callback module functions. Messages are
+%% dequeued from the buffer for processing. Thus the effective message
+%% queue of a gen_server2 process is the concatenation of the internal
+%% buffer and the real message queue.
+%% As a result of the draining, any selective receive invoked inside a
+%% callback is less likely to have to scan a large message queue.
+%%
+%% 3) gen_server2:cast is guaranteed to be order-preserving
+%% The original code could reorder messages when communicating with a
+%% process on a remote node that was not currently connected.
+%%
+%% 4) The new functions gen_server2:pcall/3, pcall/4, and pcast/3
+%% allow callers to attach priorities to requests. Requests with
+%% higher priorities are processed before requests with lower
+%% priorities. The default priority is 0.
+%%
+%% All modifications are (C) 2009 LShift Ltd.
+
+%% ``The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved via the world wide web at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
+%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
+%% AB. All Rights Reserved.''
+%%
+%% $Id$
+%%
+-module(gen_server2).
+
+%%% ---------------------------------------------------
+%%%
+%%% The idea behind THIS server is that the user module
+%%% provides (different) functions to handle different
+%%% kind of inputs.
+%%% If the Parent process terminates the Module:terminate/2
+%%% function is called.
+%%%
+%%% The user module should export:
+%%%
+%%% init(Args)
+%%% ==> {ok, State}
+%%% {ok, State, Timeout}
+%%% ignore
+%%% {stop, Reason}
+%%%
+%%% handle_call(Msg, {From, Tag}, State)
+%%%
+%%% ==> {reply, Reply, State}
+%%% {reply, Reply, State, Timeout}
+%%% {noreply, State}
+%%% {noreply, State, Timeout}
+%%% {stop, Reason, Reply, State}
+%%% Reason = normal | shutdown | Term terminate(State) is called
+%%%
+%%% handle_cast(Msg, State)
+%%%
+%%% ==> {noreply, State}
+%%% {noreply, State, Timeout}
+%%% {stop, Reason, State}
+%%% Reason = normal | shutdown | Term terminate(State) is called
+%%%
+%%% handle_info(Info, State) Info is e.g. {'EXIT', P, R}, {nodedown, N}, ...
+%%%
+%%% ==> {noreply, State}
+%%% {noreply, State, Timeout}
+%%% {stop, Reason, State}
+%%% Reason = normal | shutdown | Term, terminate(State) is called
+%%%
+%%% terminate(Reason, State) Let the user module clean up
+%%% always called when server terminates
+%%%
+%%% ==> ok
+%%%
+%%%
+%%% The work flow (of the server) can be described as follows:
+%%%
+%%% User module Generic
+%%% ----------- -------
+%%% start -----> start
+%%% init <----- .
+%%%
+%%% loop
+%%% handle_call <----- .
+%%% -----> reply
+%%%
+%%% handle_cast <----- .
+%%%
+%%% handle_info <----- .
+%%%
+%%% terminate <----- .
+%%%
+%%% -----> reply
+%%%
+%%%
+%%% ---------------------------------------------------
+
+%% API
+-export([start/3, start/4,
+ start_link/3, start_link/4,
+ call/2, call/3, pcall/3, pcall/4,
+ cast/2, pcast/3, reply/2,
+ abcast/2, abcast/3,
+ multi_call/2, multi_call/3, multi_call/4,
+ enter_loop/3, enter_loop/4, enter_loop/5]).
+
+-export([behaviour_info/1]).
+
+%% System exports
+-export([system_continue/3,
+ system_terminate/4,
+ system_code_change/4,
+ format_status/2]).
+
+%% Internal exports
+-export([init_it/6, print_event/3]).
+
+-import(error_logger, [format/2]).
+
+%%%=========================================================================
+%%% API
+%%%=========================================================================
+
+behaviour_info(callbacks) ->
+ [{init,1},{handle_call,3},{handle_cast,2},{handle_info,2},
+ {terminate,2},{code_change,3}];
+behaviour_info(_Other) ->
+ undefined.
+
+%%% -----------------------------------------------------------------
+%%% Starts a generic server.
+%%% start(Mod, Args, Options)
+%%% start(Name, Mod, Args, Options)
+%%% start_link(Mod, Args, Options)
+%%% start_link(Name, Mod, Args, Options) where:
+%%% Name ::= {local, atom()} | {global, atom()}
+%%% Mod ::= atom(), callback module implementing the 'real' server
+%%% Args ::= term(), init arguments (to Mod:init/1)
+%%% Options ::= [{timeout, Timeout} | {debug, [Flag]}]
+%%% Flag ::= trace | log | {logfile, File} | statistics | debug
+%%% (debug == log && statistics)
+%%% Returns: {ok, Pid} |
+%%% {error, {already_started, Pid}} |
+%%% {error, Reason}
+%%% -----------------------------------------------------------------
+start(Mod, Args, Options) ->
+ gen:start(?MODULE, nolink, Mod, Args, Options).
+
+start(Name, Mod, Args, Options) ->
+ gen:start(?MODULE, nolink, Name, Mod, Args, Options).
+
+start_link(Mod, Args, Options) ->
+ gen:start(?MODULE, link, Mod, Args, Options).
+
+start_link(Name, Mod, Args, Options) ->
+ gen:start(?MODULE, link, Name, Mod, Args, Options).
+
+
+%% -----------------------------------------------------------------
+%% Make a call to a generic server.
+%% If the server is located at another node, that node will
+%% be monitored.
+%% If the client is trapping exits and is linked server termination
+%% is handled here (? Shall we do that here (or rely on timeouts) ?).
+%% -----------------------------------------------------------------
+call(Name, Request) ->
+ case catch gen:call(Name, '$gen_call', Request) of
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, call, [Name, Request]}})
+ end.
+
+call(Name, Request, Timeout) ->
+ case catch gen:call(Name, '$gen_call', Request, Timeout) of
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, call, [Name, Request, Timeout]}})
+ end.
+
+pcall(Name, Priority, Request) ->
+ case catch gen:call(Name, '$gen_pcall', {Priority, Request}) of
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, pcall, [Name, Priority, Request]}})
+ end.
+
+pcall(Name, Priority, Request, Timeout) ->
+ case catch gen:call(Name, '$gen_pcall', {Priority, Request}, Timeout) of
+ {ok,Res} ->
+ Res;
+ {'EXIT',Reason} ->
+ exit({Reason, {?MODULE, pcall, [Name, Priority, Request, Timeout]}})
+ end.
+
+%% -----------------------------------------------------------------
+%% Make a cast to a generic server.
+%% -----------------------------------------------------------------
+cast({global,Name}, Request) ->
+ catch global:send(Name, cast_msg(Request)),
+ ok;
+cast({Name,Node}=Dest, Request) when is_atom(Name), is_atom(Node) ->
+ do_cast(Dest, Request);
+cast(Dest, Request) when is_atom(Dest) ->
+ do_cast(Dest, Request);
+cast(Dest, Request) when is_pid(Dest) ->
+ do_cast(Dest, Request).
+
+do_cast(Dest, Request) ->
+ do_send(Dest, cast_msg(Request)),
+ ok.
+
+cast_msg(Request) -> {'$gen_cast',Request}.
+
+pcast({global,Name}, Priority, Request) ->
+ catch global:send(Name, cast_msg(Priority, Request)),
+ ok;
+pcast({Name,Node}=Dest, Priority, Request) when is_atom(Name), is_atom(Node) ->
+ do_cast(Dest, Priority, Request);
+pcast(Dest, Priority, Request) when is_atom(Dest) ->
+ do_cast(Dest, Priority, Request);
+pcast(Dest, Priority, Request) when is_pid(Dest) ->
+ do_cast(Dest, Priority, Request).
+
+do_cast(Dest, Priority, Request) ->
+ do_send(Dest, cast_msg(Priority, Request)),
+ ok.
+
+cast_msg(Priority, Request) -> {'$gen_pcast', {Priority, Request}}.
+
+%% -----------------------------------------------------------------
+%% Send a reply to the client.
+%% -----------------------------------------------------------------
+reply({To, Tag}, Reply) ->
+ catch To ! {Tag, Reply}.
+
+%% -----------------------------------------------------------------
+%% Asyncronous broadcast, returns nothing, it's just send'n prey
+%%-----------------------------------------------------------------
+abcast(Name, Request) when is_atom(Name) ->
+ do_abcast([node() | nodes()], Name, cast_msg(Request)).
+
+abcast(Nodes, Name, Request) when is_list(Nodes), is_atom(Name) ->
+ do_abcast(Nodes, Name, cast_msg(Request)).
+
+do_abcast([Node|Nodes], Name, Msg) when is_atom(Node) ->
+ do_send({Name,Node},Msg),
+ do_abcast(Nodes, Name, Msg);
+do_abcast([], _,_) -> abcast.
+
+%%% -----------------------------------------------------------------
+%%% Make a call to servers at several nodes.
+%%% Returns: {[Replies],[BadNodes]}
+%%% A Timeout can be given
+%%%
+%%% A middleman process is used in case late answers arrives after
+%%% the timeout. If they would be allowed to glog the callers message
+%%% queue, it would probably become confused. Late answers will
+%%% now arrive to the terminated middleman and so be discarded.
+%%% -----------------------------------------------------------------
+multi_call(Name, Req)
+ when is_atom(Name) ->
+ do_multi_call([node() | nodes()], Name, Req, infinity).
+
+multi_call(Nodes, Name, Req)
+ when is_list(Nodes), is_atom(Name) ->
+ do_multi_call(Nodes, Name, Req, infinity).
+
+multi_call(Nodes, Name, Req, infinity) ->
+ do_multi_call(Nodes, Name, Req, infinity);
+multi_call(Nodes, Name, Req, Timeout)
+ when is_list(Nodes), is_atom(Name), is_integer(Timeout), Timeout >= 0 ->
+ do_multi_call(Nodes, Name, Req, Timeout).
+
+
+%%-----------------------------------------------------------------
+%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>) ->_
+%%
+%% Description: Makes an existing process into a gen_server.
+%% The calling process will enter the gen_server receive
+%% loop and become a gen_server process.
+%% The process *must* have been started using one of the
+%% start functions in proc_lib, see proc_lib(3).
+%% The user is responsible for any initialization of the
+%% process, including registering a name for it.
+%%-----------------------------------------------------------------
+enter_loop(Mod, Options, State) ->
+ enter_loop(Mod, Options, State, self(), infinity).
+
+enter_loop(Mod, Options, State, ServerName = {_, _}) ->
+ enter_loop(Mod, Options, State, ServerName, infinity);
+
+enter_loop(Mod, Options, State, Timeout) ->
+ enter_loop(Mod, Options, State, self(), Timeout).
+
+enter_loop(Mod, Options, State, ServerName, Timeout) ->
+ Name = get_proc_name(ServerName),
+ Parent = get_parent(),
+ Debug = debug_options(Name, Options),
+ Queue = priority_queue:new(),
+ loop(Parent, Name, State, Mod, Timeout, Queue, Debug).
+
+%%%========================================================================
+%%% Gen-callback functions
+%%%========================================================================
+
+%%% ---------------------------------------------------
+%%% Initiate the new process.
+%%% Register the name using the Rfunc function
+%%% Calls the Mod:init/Args function.
+%%% Finally an acknowledge is sent to Parent and the main
+%%% loop is entered.
+%%% ---------------------------------------------------
+init_it(Starter, self, Name, Mod, Args, Options) ->
+ init_it(Starter, self(), Name, Mod, Args, Options);
+init_it(Starter, Parent, Name, Mod, Args, Options) ->
+ Debug = debug_options(Name, Options),
+ Queue = priority_queue:new(),
+ case catch Mod:init(Args) of
+ {ok, State} ->
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(Parent, Name, State, Mod, infinity, Queue, Debug);
+ {ok, State, Timeout} ->
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(Parent, Name, State, Mod, Timeout, Queue, Debug);
+ {stop, Reason} ->
+ proc_lib:init_ack(Starter, {error, Reason}),
+ exit(Reason);
+ ignore ->
+ proc_lib:init_ack(Starter, ignore),
+ exit(normal);
+ {'EXIT', Reason} ->
+ proc_lib:init_ack(Starter, {error, Reason}),
+ exit(Reason);
+ Else ->
+ Error = {bad_return_value, Else},
+ proc_lib:init_ack(Starter, {error, Error}),
+ exit(Error)
+ end.
+
+%%%========================================================================
+%%% Internal functions
+%%%========================================================================
+%%% ---------------------------------------------------
+%%% The MAIN loop.
+%%% ---------------------------------------------------
+loop(Parent, Name, State, Mod, Time, Queue, Debug) ->
+ receive
+ Input -> loop(Parent, Name, State, Mod,
+ Time, in(Input, Queue), Debug)
+ after 0 ->
+ case priority_queue:out(Queue) of
+ {{value, Msg}, Queue1} ->
+ process_msg(Parent, Name, State, Mod,
+ Time, Queue1, Debug, Msg);
+ {empty, Queue1} ->
+ receive
+ Input ->
+ loop(Parent, Name, State, Mod,
+ Time, in(Input, Queue1), Debug)
+ after Time ->
+ process_msg(Parent, Name, State, Mod,
+ Time, Queue1, Debug, timeout)
+ end
+ end
+ end.
+
+in({'$gen_pcast', {Priority, Msg}}, Queue) ->
+ priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
+in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
+ priority_queue:in({'$gen_call', From, Msg}, Priority, Queue);
+in(Input, Queue) ->
+ priority_queue:in(Input, Queue).
+
+process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) ->
+ case Msg of
+ {system, From, Req} ->
+ sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
+ [Name, State, Mod, Time, Queue]);
+ {'EXIT', Parent, Reason} ->
+ terminate(Reason, Name, Msg, Mod, State, Debug);
+ _Msg when Debug =:= [] ->
+ handle_msg(Msg, Parent, Name, State, Mod, Time, Queue);
+ _Msg ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
+ Name, {in, Msg}),
+ handle_msg(Msg, Parent, Name, State, Mod, Time, Queue, Debug1)
+ end.
+
+%%% ---------------------------------------------------
+%%% Send/recive functions
+%%% ---------------------------------------------------
+do_send(Dest, Msg) ->
+ catch erlang:send(Dest, Msg).
+
+do_multi_call(Nodes, Name, Req, infinity) ->
+ Tag = make_ref(),
+ Monitors = send_nodes(Nodes, Name, Tag, Req),
+ rec_nodes(Tag, Monitors, Name, undefined);
+do_multi_call(Nodes, Name, Req, Timeout) ->
+ Tag = make_ref(),
+ Caller = self(),
+ Receiver =
+ spawn(
+ fun() ->
+ %% Middleman process. Should be unsensitive to regular
+ %% exit signals. The sychronization is needed in case
+ %% the receiver would exit before the caller started
+ %% the monitor.
+ process_flag(trap_exit, true),
+ Mref = erlang:monitor(process, Caller),
+ receive
+ {Caller,Tag} ->
+ Monitors = send_nodes(Nodes, Name, Tag, Req),
+ TimerId = erlang:start_timer(Timeout, self(), ok),
+ Result = rec_nodes(Tag, Monitors, Name, TimerId),
+ exit({self(),Tag,Result});
+ {'DOWN',Mref,_,_,_} ->
+ %% Caller died before sending us the go-ahead.
+ %% Give up silently.
+ exit(normal)
+ end
+ end),
+ Mref = erlang:monitor(process, Receiver),
+ Receiver ! {self(),Tag},
+ receive
+ {'DOWN',Mref,_,_,{Receiver,Tag,Result}} ->
+ Result;
+ {'DOWN',Mref,_,_,Reason} ->
+ %% The middleman code failed. Or someone did
+ %% exit(_, kill) on the middleman process => Reason==killed
+ exit(Reason)
+ end.
+
+send_nodes(Nodes, Name, Tag, Req) ->
+ send_nodes(Nodes, Name, Tag, Req, []).
+
+send_nodes([Node|Tail], Name, Tag, Req, Monitors)
+ when is_atom(Node) ->
+ Monitor = start_monitor(Node, Name),
+ %% Handle non-existing names in rec_nodes.
+ catch {Name, Node} ! {'$gen_call', {self(), {Tag, Node}}, Req},
+ send_nodes(Tail, Name, Tag, Req, [Monitor | Monitors]);
+send_nodes([_Node|Tail], Name, Tag, Req, Monitors) ->
+ %% Skip non-atom Node
+ send_nodes(Tail, Name, Tag, Req, Monitors);
+send_nodes([], _Name, _Tag, _Req, Monitors) ->
+ Monitors.
+
+%% Against old nodes:
+%% If no reply has been delivered within 2 secs. (per node) check that
+%% the server really exists and wait for ever for the answer.
+%%
+%% Against contemporary nodes:
+%% Wait for reply, server 'DOWN', or timeout from TimerId.
+
+rec_nodes(Tag, Nodes, Name, TimerId) ->
+ rec_nodes(Tag, Nodes, Name, [], [], 2000, TimerId).
+
+rec_nodes(Tag, [{N,R}|Tail], Name, Badnodes, Replies, Time, TimerId ) ->
+ receive
+ {'DOWN', R, _, _, _} ->
+ rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, Time, TimerId);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ unmonitor(R),
+ rec_nodes(Tag, Tail, Name, Badnodes,
+ [{N,Reply}|Replies], Time, TimerId);
+ {timeout, TimerId, _} ->
+ unmonitor(R),
+ %% Collect all replies that already have arrived
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ end;
+rec_nodes(Tag, [N|Tail], Name, Badnodes, Replies, Time, TimerId) ->
+ %% R6 node
+ receive
+ {nodedown, N} ->
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, [N|Badnodes], Replies, 2000, TimerId);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, Badnodes,
+ [{N,Reply}|Replies], 2000, TimerId);
+ {timeout, TimerId, _} ->
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ %% Collect all replies that already have arrived
+ rec_nodes_rest(Tag, Tail, Name, [N | Badnodes], Replies)
+ after Time ->
+ case rpc:call(N, erlang, whereis, [Name]) of
+ Pid when is_pid(Pid) -> % It exists try again.
+ rec_nodes(Tag, [N|Tail], Name, Badnodes,
+ Replies, infinity, TimerId);
+ _ -> % badnode
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes(Tag, Tail, Name, [N|Badnodes],
+ Replies, 2000, TimerId)
+ end
+ end;
+rec_nodes(_, [], _, Badnodes, Replies, _, TimerId) ->
+ case catch erlang:cancel_timer(TimerId) of
+ false -> % It has already sent it's message
+ receive
+ {timeout, TimerId, _} -> ok
+ after 0 ->
+ ok
+ end;
+ _ -> % Timer was cancelled, or TimerId was 'undefined'
+ ok
+ end,
+ {Replies, Badnodes}.
+
+%% Collect all replies that already have arrived
+rec_nodes_rest(Tag, [{N,R}|Tail], Name, Badnodes, Replies) ->
+ receive
+ {'DOWN', R, _, _, _} ->
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ unmonitor(R),
+ rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
+ after 0 ->
+ unmonitor(R),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ end;
+rec_nodes_rest(Tag, [N|Tail], Name, Badnodes, Replies) ->
+ %% R6 node
+ receive
+ {nodedown, N} ->
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies);
+ {{Tag, N}, Reply} -> %% Tag is bound !!!
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, Badnodes, [{N,Reply}|Replies])
+ after 0 ->
+ receive {nodedown, N} -> ok after 0 -> ok end,
+ monitor_node(N, false),
+ rec_nodes_rest(Tag, Tail, Name, [N|Badnodes], Replies)
+ end;
+rec_nodes_rest(_Tag, [], _Name, Badnodes, Replies) ->
+ {Replies, Badnodes}.
+
+
+%%% ---------------------------------------------------
+%%% Monitor functions
+%%% ---------------------------------------------------
+
+start_monitor(Node, Name) when is_atom(Node), is_atom(Name) ->
+ if node() =:= nonode@nohost, Node =/= nonode@nohost ->
+ Ref = make_ref(),
+ self() ! {'DOWN', Ref, process, {Name, Node}, noconnection},
+ {Node, Ref};
+ true ->
+ case catch erlang:monitor(process, {Name, Node}) of
+ {'EXIT', _} ->
+ %% Remote node is R6
+ monitor_node(Node, true),
+ Node;
+ Ref when is_reference(Ref) ->
+ {Node, Ref}
+ end
+ end.
+
+%% Cancels a monitor started with Ref=erlang:monitor(_, _).
+unmonitor(Ref) when is_reference(Ref) ->
+ erlang:demonitor(Ref),
+ receive
+ {'DOWN', Ref, _, _, _} ->
+ true
+ after 0 ->
+ true
+ end.
+
+%%% ---------------------------------------------------
+%%% Message handling functions
+%%% ---------------------------------------------------
+
+dispatch({'$gen_cast', Msg}, Mod, State) ->
+ Mod:handle_cast(Msg, State);
+dispatch(Info, Mod, State) ->
+ Mod:handle_info(Info, State).
+
+handle_msg({'$gen_call', From, Msg},
+ Parent, Name, State, Mod, _Time, Queue) ->
+ case catch Mod:handle_call(Msg, From, State) of
+ {reply, Reply, NState} ->
+ reply(From, Reply),
+ loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ {reply, Reply, NState, Time1} ->
+ reply(From, Reply),
+ loop(Parent, Name, NState, Mod, Time1, Queue, []);
+ {noreply, NState} ->
+ loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ {noreply, NState, Time1} ->
+ loop(Parent, Name, NState, Mod, Time1, Queue, []);
+ {stop, Reason, Reply, NState} ->
+ {'EXIT', R} =
+ (catch terminate(Reason, Name, Msg, Mod, NState, [])),
+ reply(From, Reply),
+ exit(R);
+ Other -> handle_common_reply(Other,
+ Parent, Name, Msg, Mod, State, Queue)
+ end;
+handle_msg(Msg,
+ Parent, Name, State, Mod, _Time, Queue) ->
+ Reply = (catch dispatch(Msg, Mod, State)),
+ handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue).
+
+handle_msg({'$gen_call', From, Msg},
+ Parent, Name, State, Mod, _Time, Queue, Debug) ->
+ case catch Mod:handle_call(Msg, From, State) of
+ {reply, Reply, NState} ->
+ Debug1 = reply(Name, From, Reply, NState, Debug),
+ loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ {reply, Reply, NState, Time1} ->
+ Debug1 = reply(Name, From, Reply, NState, Debug),
+ loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ {noreply, NState} ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ {noreply, NState, Time1} ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ {stop, Reason, Reply, NState} ->
+ {'EXIT', R} =
+ (catch terminate(Reason, Name, Msg, Mod, NState, Debug)),
+ reply(Name, From, Reply, NState, Debug),
+ exit(R);
+ Other ->
+ handle_common_reply(Other,
+ Parent, Name, Msg, Mod, State, Queue, Debug)
+ end;
+handle_msg(Msg,
+ Parent, Name, State, Mod, _Time, Queue, Debug) ->
+ Reply = (catch dispatch(Msg, Mod, State)),
+ handle_common_reply(Reply,
+ Parent, Name, Msg, Mod, State, Queue, Debug).
+
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) ->
+ case Reply of
+ {noreply, NState} ->
+ loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ {noreply, NState, Time1} ->
+ loop(Parent, Name, NState, Mod, Time1, Queue, []);
+ {stop, Reason, NState} ->
+ terminate(Reason, Name, Msg, Mod, NState, []);
+ {'EXIT', What} ->
+ terminate(What, Name, Msg, Mod, State, []);
+ _ ->
+ terminate({bad_return_value, Reply}, Name, Msg, Mod, State, [])
+ end.
+
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) ->
+ case Reply of
+ {noreply, NState} ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ {noreply, NState, Time1} ->
+ Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {noreply, NState}),
+ loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ {stop, Reason, NState} ->
+ terminate(Reason, Name, Msg, Mod, NState, Debug);
+ {'EXIT', What} ->
+ terminate(What, Name, Msg, Mod, State, Debug);
+ _ ->
+ terminate({bad_return_value, Reply}, Name, Msg, Mod, State, Debug)
+ end.
+
+reply(Name, {To, Tag}, Reply, State, Debug) ->
+ reply({To, Tag}, Reply),
+ sys:handle_debug(Debug, {?MODULE, print_event}, Name,
+ {out, Reply, To, State} ).
+
+
+%%-----------------------------------------------------------------
+%% Callback functions for system messages handling.
+%%-----------------------------------------------------------------
+system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) ->
+ loop(Parent, Name, State, Mod, Time, Queue, Debug).
+
+system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) ->
+ terminate(Reason, Name, [], Mod, State, Debug).
+
+system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) ->
+ case catch Mod:code_change(OldVsn, State, Extra) of
+ {ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]};
+ Else -> Else
+ end.
+
+%%-----------------------------------------------------------------
+%% Format debug messages. Print them as the call-back module sees
+%% them, not as the real erlang messages. Use trace for that.
+%%-----------------------------------------------------------------
+print_event(Dev, {in, Msg}, Name) ->
+ case Msg of
+ {'$gen_call', {From, _Tag}, Call} ->
+ io:format(Dev, "*DBG* ~p got call ~p from ~w~n",
+ [Name, Call, From]);
+ {'$gen_cast', Cast} ->
+ io:format(Dev, "*DBG* ~p got cast ~p~n",
+ [Name, Cast]);
+ _ ->
+ io:format(Dev, "*DBG* ~p got ~p~n", [Name, Msg])
+ end;
+print_event(Dev, {out, Msg, To, State}, Name) ->
+ io:format(Dev, "*DBG* ~p sent ~p to ~w, new state ~w~n",
+ [Name, Msg, To, State]);
+print_event(Dev, {noreply, State}, Name) ->
+ io:format(Dev, "*DBG* ~p new state ~w~n", [Name, State]);
+print_event(Dev, Event, Name) ->
+ io:format(Dev, "*DBG* ~p dbg ~p~n", [Name, Event]).
+
+
+%%% ---------------------------------------------------
+%%% Terminate the server.
+%%% ---------------------------------------------------
+
+terminate(Reason, Name, Msg, Mod, State, Debug) ->
+ case catch Mod:terminate(Reason, State) of
+ {'EXIT', R} ->
+ error_info(R, Name, Msg, State, Debug),
+ exit(R);
+ _ ->
+ case Reason of
+ normal ->
+ exit(normal);
+ shutdown ->
+ exit(shutdown);
+ _ ->
+ error_info(Reason, Name, Msg, State, Debug),
+ exit(Reason)
+ end
+ end.
+
+error_info(_Reason, application_controller, _Msg, _State, _Debug) ->
+ %% OTP-5811 Don't send an error report if it's the system process
+ %% application_controller which is terminating - let init take care
+ %% of it instead
+ ok;
+error_info(Reason, Name, Msg, State, Debug) ->
+ Reason1 =
+ case Reason of
+ {undef,[{M,F,A}|MFAs]} ->
+ case code:is_loaded(M) of
+ false ->
+ {'module could not be loaded',[{M,F,A}|MFAs]};
+ _ ->
+ case erlang:function_exported(M, F, length(A)) of
+ true ->
+ Reason;
+ false ->
+ {'function not exported',[{M,F,A}|MFAs]}
+ end
+ end;
+ _ ->
+ Reason
+ end,
+ format("** Generic server ~p terminating \n"
+ "** Last message in was ~p~n"
+ "** When Server state == ~p~n"
+ "** Reason for termination == ~n** ~p~n",
+ [Name, Msg, State, Reason1]),
+ sys:print_log(Debug),
+ ok.
+
+%%% ---------------------------------------------------
+%%% Misc. functions.
+%%% ---------------------------------------------------
+
+opt(Op, [{Op, Value}|_]) ->
+ {ok, Value};
+opt(Op, [_|Options]) ->
+ opt(Op, Options);
+opt(_, []) ->
+ false.
+
+debug_options(Name, Opts) ->
+ case opt(debug, Opts) of
+ {ok, Options} -> dbg_options(Name, Options);
+ _ -> dbg_options(Name, [])
+ end.
+
+dbg_options(Name, []) ->
+ Opts =
+ case init:get_argument(generic_debug) of
+ error ->
+ [];
+ _ ->
+ [log, statistics]
+ end,
+ dbg_opts(Name, Opts);
+dbg_options(Name, Opts) ->
+ dbg_opts(Name, Opts).
+
+dbg_opts(Name, Opts) ->
+ case catch sys:debug_options(Opts) of
+ {'EXIT',_} ->
+ format("~p: ignoring erroneous debug options - ~p~n",
+ [Name, Opts]),
+ [];
+ Dbg ->
+ Dbg
+ end.
+
+get_proc_name(Pid) when is_pid(Pid) ->
+ Pid;
+get_proc_name({local, Name}) ->
+ case process_info(self(), registered_name) of
+ {registered_name, Name} ->
+ Name;
+ {registered_name, _Name} ->
+ exit(process_not_registered);
+ [] ->
+ exit(process_not_registered)
+ end;
+get_proc_name({global, Name}) ->
+ case global:safe_whereis_name(Name) of
+ undefined ->
+ exit(process_not_registered_globally);
+ Pid when Pid =:= self() ->
+ Name;
+ _Pid ->
+ exit(process_not_registered_globally)
+ end.
+
+get_parent() ->
+ case get('$ancestors') of
+ [Parent | _] when is_pid(Parent)->
+ Parent;
+ [Parent | _] when is_atom(Parent)->
+ name_to_pid(Parent);
+ _ ->
+ exit(process_was_not_started_by_proc_lib)
+ end.
+
+name_to_pid(Name) ->
+ case whereis(Name) of
+ undefined ->
+ case global:safe_whereis_name(Name) of
+ undefined ->
+ exit(could_not_find_registerd_name);
+ Pid ->
+ Pid
+ end;
+ Pid ->
+ Pid
+ end.
+
+%%-----------------------------------------------------------------
+%% Status information
+%%-----------------------------------------------------------------
+format_status(Opt, StatusData) ->
+ [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, Queue]] =
+ StatusData,
+ NameTag = if is_pid(Name) ->
+ pid_to_list(Name);
+ is_atom(Name) ->
+ Name
+ end,
+ Header = lists:concat(["Status for generic server ", NameTag]),
+ Log = sys:get_debug(log, Debug, []),
+ Specfic =
+ case erlang:function_exported(Mod, format_status, 2) of
+ true ->
+ case catch Mod:format_status(Opt, [PDict, State]) of
+ {'EXIT', _} -> [{data, [{"State", State}]}];
+ Else -> Else
+ end;
+ _ ->
+ [{data, [{"State", State}]}]
+ end,
+ [{header, Header},
+ {data, [{"Status", SysState},
+ {"Parent", Parent},
+ {"Logged events", Log},
+ {"Queued messages", priority_queue:to_list(Queue)}]} |
+ Specfic].
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
new file mode 100644
index 00000000..732757c4
--- /dev/null
+++ b/src/priority_queue.erl
@@ -0,0 +1,153 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+%% Priority queues have essentially the same interface as ordinary
+%% queues, except that a) there is an in/3 that takes a priority, and
+%% b) we have only implemented the core API we need.
+%%
+%% Priorities should be integers - the higher the value the higher the
+%% priority - but we don't actually check that.
+%%
+%% in/2 inserts items with priority 0.
+%%
+%% We optimise the case where a priority queue is being used just like
+%% an ordinary queue. When that is the case we represent the priority
+%% queue as an ordinary queue. We could just call into the 'queue'
+%% module for that, but for efficiency we implement the relevant
+%% functions directly in here, thus saving on inter-module calls and
+%% eliminating a level of boxing.
+%%
+%% When the queue contains items with non-zero priorities, it is
+%% represented as a sorted kv list with the inverted Priority as the
+%% key and an ordinary queue as the value. Here again we use our own
+%% ordinary queue implemention for efficiency, often making recursive
+%% calls into the same function knowing that ordinary queues represent
+%% a base case.
+
+
+-module(priority_queue).
+
+-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(priority() :: integer()).
+-type(squeue() :: {queue, [any()], [any()]}).
+-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}).
+
+-spec(new/0 :: () -> pqueue()).
+-spec(is_queue/1 :: (any()) -> bool()).
+-spec(is_empty/1 :: (pqueue()) -> bool()).
+-spec(len/1 :: (pqueue()) -> non_neg_integer()).
+-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]).
+-spec(in/2 :: (any(), pqueue()) -> pqueue()).
+-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
+-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+new() ->
+ {queue, [], []}.
+
+is_queue({queue, R, F}) when is_list(R), is_list(F) ->
+ true;
+is_queue({pqueue, Queues}) when is_list(Queues) ->
+ lists:all(fun ({P, Q}) -> is_integer(P) andalso is_queue(Q) end,
+ Queues);
+is_queue(_) ->
+ false.
+
+is_empty({queue, [], []}) ->
+ true;
+is_empty(_) ->
+ false.
+
+len({queue, R, F}) when is_list(R), is_list(F) ->
+ length(R) + length(F);
+len({pqueue, Queues}) ->
+ lists:sum([len(Q) || {_, Q} <- Queues]).
+
+to_list({queue, In, Out}) when is_list(In), is_list(Out) ->
+ [{0, V} || V <- Out ++ lists:reverse(In, [])];
+to_list({pqueue, Queues}) ->
+ [{-P, V} || {P, Q} <- Queues, {0, V} <- to_list(Q)].
+
+in(Item, Q) ->
+ in(Item, 0, Q).
+
+in(X, 0, {queue, [_] = In, []}) ->
+ {queue, [X], In};
+in(X, 0, {queue, In, Out}) when is_list(In), is_list(Out) ->
+ {queue, [X|In], Out};
+in(X, Priority, _Q = {queue, [], []}) ->
+ in(X, Priority, {pqueue, []});
+in(X, Priority, Q = {queue, _, _}) ->
+ in(X, Priority, {pqueue, [{0, Q}]});
+in(X, Priority, {pqueue, Queues}) ->
+ P = -Priority,
+ {pqueue, case lists:keysearch(P, 1, Queues) of
+ {value, {_, Q}} ->
+ lists:keyreplace(P, 1, Queues, {P, in(X, Q)});
+ false ->
+ lists:keysort(1, [{P, {queue, [X], []}} | Queues])
+ end}.
+
+out({queue, [], []} = Q) ->
+ {empty, Q};
+out({queue, [V], []}) ->
+ {{value, V}, {queue, [], []}};
+out({queue, [Y|In], []}) ->
+ [V|Out] = lists:reverse(In, []),
+ {{value, V}, {queue, [Y], Out}};
+out({queue, In, [V]}) when is_list(In) ->
+ {{value,V}, r2f(In)};
+out({queue, In,[V|Out]}) when is_list(In) ->
+ {{value, V}, {queue, In, Out}};
+out({pqueue, [{P, Q} | Queues]}) ->
+ {R, Q1} = out(Q),
+ NewQ = case is_empty(Q1) of
+ true -> case Queues of
+ [] -> {queue, [], []};
+ [{0, OnlyQ}] -> OnlyQ;
+ [_|_] -> {pqueue, Queues}
+ end;
+ false -> {pqueue, [{P, Q1} | Queues]}
+ end,
+ {R, NewQ}.
+
+r2f([]) -> {queue, [], []};
+r2f([_] = R) -> {queue, [], R};
+r2f([X,Y]) -> {queue, [X], [Y]};
+r2f([X,Y|R]) -> {queue, [X,Y], lists:reverse(R, [])}.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 97bbdd99..0de93e99 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -75,19 +75,20 @@ start() ->
try
ok = ensure_working_log_handlers(),
ok = rabbit_mnesia:ensure_mnesia_dir(),
- ok = start_applications(?APPS)
+ ok = rabbit_misc:start_applications(?APPS)
after
%%give the error loggers some time to catch up
timer:sleep(100)
end.
stop() ->
- ok = stop_applications(?APPS).
+ ok = rabbit_misc:stop_applications(?APPS).
stop_and_halt() ->
spawn(fun () ->
SleepTime = 1000,
- rabbit_log:info("Stop-and-halt request received; halting in ~p milliseconds~n",
+ rabbit_log:info("Stop-and-halt request received; "
+ "halting in ~p milliseconds~n",
[SleepTime]),
timer:sleep(SleepTime),
init:stop()
@@ -109,34 +110,6 @@ rotate_logs(BinarySuffix) ->
%%--------------------------------------------------------------------
-manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
- Iterate(fun (App, Acc) ->
- case Do(App) of
- ok -> [App | Acc];
- {error, {SkipError, _}} -> Acc;
- {error, Reason} ->
- lists:foreach(Undo, Acc),
- throw({error, {ErrorTag, App, Reason}})
- end
- end, [], Apps),
- ok.
-
-start_applications(Apps) ->
- manage_applications(fun lists:foldl/3,
- fun application:start/1,
- fun application:stop/1,
- already_started,
- cannot_start_application,
- Apps).
-
-stop_applications(Apps) ->
- manage_applications(fun lists:foldr/3,
- fun application:stop/1,
- fun application:start/1,
- not_started,
- cannot_stop_application,
- Apps).
-
start(normal, []) ->
{ok, SupPid} = rabbit_sup:start_link(),
@@ -296,9 +269,14 @@ insert_default_data() ->
{ok, DefaultUser} = application:get_env(default_user),
{ok, DefaultPass} = application:get_env(default_pass),
{ok, DefaultVHost} = application:get_env(default_vhost),
+ {ok, [DefaultConfigurePerm, DefaultWritePerm, DefaultReadPerm]} =
+ application:get_env(default_permissions),
ok = rabbit_access_control:add_vhost(DefaultVHost),
ok = rabbit_access_control:add_user(DefaultUser, DefaultPass),
- ok = rabbit_access_control:map_user_vhost(DefaultUser, DefaultVHost),
+ ok = rabbit_access_control:set_permissions(DefaultUser, DefaultVHost,
+ DefaultConfigurePerm,
+ DefaultWritePerm,
+ DefaultReadPerm),
ok.
start_builtin_amq_applications() ->
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index b73090fc..54348d9a 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -34,11 +34,12 @@
-include("rabbit.hrl").
-export([check_login/2, user_pass_login/2,
- check_vhost_access/2]).
+ check_vhost_access/2, check_resource_access/3]).
-export([add_user/2, delete_user/1, change_password/2, list_users/0,
lookup_user/1]).
--export([add_vhost/1, delete_vhost/1, list_vhosts/0, list_vhost_users/1]).
--export([list_user_vhosts/1, map_user_vhost/2, unmap_user_vhost/2]).
+-export([add_vhost/1, delete_vhost/1, list_vhosts/0]).
+-export([set_permissions/5, clear_permissions/2,
+ list_vhost_permissions/1, list_user_permissions/1]).
%%----------------------------------------------------------------------------
@@ -47,6 +48,8 @@
-spec(check_login/2 :: (binary(), binary()) -> user()).
-spec(user_pass_login/2 :: (username(), password()) -> user()).
-spec(check_vhost_access/2 :: (user(), vhost()) -> 'ok').
+-spec(check_resource_access/3 ::
+ (username(), r(atom()), non_neg_integer()) -> 'ok').
-spec(add_user/2 :: (username(), password()) -> 'ok').
-spec(delete_user/1 :: (username()) -> 'ok').
-spec(change_password/2 :: (username(), password()) -> 'ok').
@@ -55,10 +58,13 @@
-spec(add_vhost/1 :: (vhost()) -> 'ok').
-spec(delete_vhost/1 :: (vhost()) -> 'ok').
-spec(list_vhosts/0 :: () -> [vhost()]).
--spec(list_vhost_users/1 :: (vhost()) -> [username()]).
--spec(list_user_vhosts/1 :: (username()) -> [vhost()]).
--spec(map_user_vhost/2 :: (username(), vhost()) -> 'ok').
--spec(unmap_user_vhost/2 :: (username(), vhost()) -> 'ok').
+-spec(set_permissions/5 ::
+ (username(), vhost(), regexp(), regexp(), regexp()) -> 'ok').
+-spec(clear_permissions/2 :: (username(), vhost()) -> 'ok').
+-spec(list_vhost_permissions/1 ::
+ (vhost()) -> [{username(), regexp(), regexp(), regexp()}]).
+-spec(list_user_permissions/1 ::
+ (username()) -> [{vhost(), regexp(), regexp(), regexp()}]).
-endif.
@@ -112,9 +118,9 @@ internal_lookup_vhost_access(Username, VHostPath) ->
%% TODO: use dirty ops instead
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:match_object(
- #user_vhost{username = Username,
- virtual_host = VHostPath}) of
+ case mnesia:read({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}}) of
[] -> not_found;
[R] -> {ok, R}
end
@@ -131,13 +137,47 @@ check_vhost_access(#user{username = Username}, VHostPath) ->
[VHostPath, Username])
end.
+check_resource_access(Username,
+ R = #resource{kind = exchange, name = <<"">>},
+ Permission) ->
+ check_resource_access(Username,
+ R#resource{name = <<"amq.default">>},
+ Permission);
+check_resource_access(_Username,
+ #resource{name = <<"amq.gen",_/binary>>},
+ _Permission) ->
+ ok;
+check_resource_access(Username,
+ R = #resource{virtual_host = VHostPath, name = Name},
+ Permission) ->
+ Res = case mnesia:dirty_read({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}}) of
+ [] ->
+ false;
+ [#user_permission{permission = P}] ->
+ case regexp:match(
+ binary_to_list(Name),
+ binary_to_list(element(Permission, P))) of
+ {match, _, _} -> true;
+ nomatch -> false
+ end
+ end,
+ if Res -> ok;
+ true -> rabbit_misc:protocol_error(
+ access_refused, "access to ~s refused for user '~s'",
+ [rabbit_misc:rs(R), Username])
+ end.
+
add_user(Username, Password) ->
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:read({user, Username}) of
+ case mnesia:wread({rabbit_user, Username}) of
[] ->
- ok = mnesia:write(#user{username = Username,
- password = Password});
+ ok = mnesia:write(rabbit_user,
+ #user{username = Username,
+ password = Password},
+ write);
_ ->
mnesia:abort({user_already_exists, Username})
end
@@ -150,8 +190,17 @@ delete_user(Username) ->
rabbit_misc:with_user(
Username,
fun () ->
- ok = mnesia:delete({user, Username}),
- ok = mnesia:delete({user_vhost, Username})
+ ok = mnesia:delete({rabbit_user, Username}),
+ [ok = mnesia:delete_object(
+ rabbit_user_permission, R, write) ||
+ R <- mnesia:match_object(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = '_'},
+ permission = '_'},
+ write)],
+ ok
end)),
rabbit_log:info("Deleted user ~p~n", [Username]),
R.
@@ -161,24 +210,28 @@ change_password(Username, Password) ->
rabbit_misc:with_user(
Username,
fun () ->
- ok = mnesia:write(#user{username = Username,
- password = Password})
+ ok = mnesia:write(rabbit_user,
+ #user{username = Username,
+ password = Password},
+ write)
end)),
rabbit_log:info("Changed password for user ~p~n", [Username]),
R.
list_users() ->
- mnesia:dirty_all_keys(user).
+ mnesia:dirty_all_keys(rabbit_user).
lookup_user(Username) ->
- rabbit_misc:dirty_read({user, Username}).
+ rabbit_misc:dirty_read({rabbit_user, Username}).
add_vhost(VHostPath) ->
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:read({vhost, VHostPath}) of
+ case mnesia:wread({rabbit_vhost, VHostPath}) of
[] ->
- ok = mnesia:write(#vhost{virtual_host = VHostPath}),
+ ok = mnesia:write(rabbit_vhost,
+ #vhost{virtual_host = VHostPath},
+ write),
[rabbit_exchange:declare(
rabbit_misc:r(VHostPath, exchange, Name),
Type, true, false, []) ||
@@ -186,6 +239,8 @@ add_vhost(VHostPath) ->
[{<<"">>, direct},
{<<"amq.direct">>, direct},
{<<"amq.topic">>, topic},
+ {<<"amq.match">>, headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, headers}, %% per 0-9-1 xml
{<<"amq.fanout">>, fanout}]],
ok;
[_] ->
@@ -218,53 +273,79 @@ internal_delete_vhost(VHostPath) ->
ok = rabbit_exchange:delete(Name, false)
end,
rabbit_exchange:list(VHostPath)),
- lists:foreach(fun (Username) ->
- ok = unmap_user_vhost(Username, VHostPath)
+ lists:foreach(fun ({Username, _, _, _}) ->
+ ok = clear_permissions(Username, VHostPath)
end,
- list_vhost_users(VHostPath)),
- ok = mnesia:delete({vhost, VHostPath}),
+ list_vhost_permissions(VHostPath)),
+ ok = mnesia:delete({rabbit_vhost, VHostPath}),
ok.
list_vhosts() ->
- mnesia:dirty_all_keys(vhost).
+ mnesia:dirty_all_keys(rabbit_vhost).
-list_vhost_users(VHostPath) ->
- [Username ||
- #user_vhost{username = Username} <-
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_vhost(
- VHostPath,
- fun () -> mnesia:index_read(user_vhost, VHostPath,
- #user_vhost.virtual_host)
- end))].
-
-list_user_vhosts(Username) ->
- [VHostPath ||
- #user_vhost{virtual_host = VHostPath} <-
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- rabbit_misc:with_user(
- Username,
- fun () -> mnesia:read({user_vhost, Username}) end))].
+validate_regexp(RegexpBin) ->
+ Regexp = binary_to_list(RegexpBin),
+ case regexp:parse(Regexp) of
+ {ok, _} -> ok;
+ {error, Reason} -> throw({error, {invalid_regexp, Regexp, Reason}})
+ end.
-map_user_vhost(Username, VHostPath) ->
+set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
+ lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]),
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
Username, VHostPath,
- fun () ->
- ok = mnesia:write(
- #user_vhost{username = Username,
- virtual_host = VHostPath})
+ fun () -> ok = mnesia:write(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = VHostPath},
+ permission = #permission{
+ configure = ConfigurePerm,
+ write = WritePerm,
+ read = ReadPerm}},
+ write)
end)).
-unmap_user_vhost(Username, VHostPath) ->
+clear_permissions(Username, VHostPath) ->
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
Username, VHostPath,
fun () ->
- ok = mnesia:delete_object(
- #user_vhost{username = Username,
- virtual_host = VHostPath})
+ ok = mnesia:delete({rabbit_user_permission,
+ #user_vhost{username = Username,
+ virtual_host = VHostPath}})
end)).
+list_vhost_permissions(VHostPath) ->
+ [{Username, ConfigurePerm, WritePerm, ReadPerm} ||
+ {Username, _, ConfigurePerm, WritePerm, ReadPerm} <-
+ list_permissions(rabbit_misc:with_vhost(
+ VHostPath, match_user_vhost('_', VHostPath)))].
+
+list_user_permissions(Username) ->
+ [{VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ {_, VHostPath, ConfigurePerm, WritePerm, ReadPerm} <-
+ list_permissions(rabbit_misc:with_user(
+ Username, match_user_vhost(Username, '_')))].
+
+list_permissions(QueryThunk) ->
+ [{Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm} ||
+ #user_permission{user_vhost = #user_vhost{username = Username,
+ virtual_host = VHostPath},
+ permission = #permission{
+ configure = ConfigurePerm,
+ write = WritePerm,
+ read = ReadPerm}} <-
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(QueryThunk)].
+
+match_user_vhost(Username, VHostPath) ->
+ fun () -> mnesia:match_object(
+ rabbit_user_permission,
+ #user_permission{user_vhost = #user_vhost{
+ username = Username,
+ virtual_host = VHostPath},
+ permission = '_'},
+ read)
+ end.
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index dee71d23..21999f16 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -53,7 +53,7 @@
-spec(start/1 :: (bool() | 'auto') -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(register/2 :: (pid(), mfa_tuple()) -> 'ok').
-
+
-endif.
%%----------------------------------------------------------------------------
@@ -78,7 +78,8 @@ stop() ->
register(Pid, HighMemMFA) ->
ok = gen_event:call(alarm_handler, ?MODULE,
- {register, Pid, HighMemMFA}).
+ {register, Pid, HighMemMFA},
+ infinity).
%%----------------------------------------------------------------------------
@@ -101,7 +102,7 @@ handle_call({register, Pid, HighMemMFA},
end,
NewAlertees = dict:store(Pid, HighMemMFA, Alertess),
{ok, ok, State#alarms{alertees = NewAlertees}};
-
+
handle_call(_Request, State) ->
{ok, not_understood, State}.
@@ -135,7 +136,7 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
start_memsup() ->
- Mod = case os:type() of
+ Mod = case os:type() of
%% memsup doesn't take account of buffers or cache when
%% considering "free" memory - therefore on Linux we can
%% get memory alarms very easily without any pressure
@@ -143,7 +144,7 @@ start_memsup() ->
%% our own simple memory monitor.
%%
{unix, linux} -> rabbit_memsup_linux;
-
+
%% Start memsup programmatically rather than via the
%% rabbitmq-server script. This is not quite the right
%% thing to do as os_mon checks to see if memsup is
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 382810c3..eb076e94 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -37,13 +37,13 @@
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
-export([claim_queue/2]).
--export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2]).
--export([commit_all/2, rollback_all/2, notify_down_all/2]).
+-export([basic_get/3, basic_consume/8, basic_cancel/4]).
+-export([notify_sent/2, unblock/2]).
+-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-import(mnesia).
--import(gen_server).
+-import(gen_server2).
-import(lists).
-import(queue).
@@ -91,15 +91,17 @@
-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
+-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
--spec(basic_consume/7 ::
- (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) ->
+-spec(basic_consume/8 ::
+ (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) ->
'ok' | {'error', 'queue_owned_by_another_connection' |
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
+-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -130,7 +132,7 @@ recover_durable_queues() ->
%% re-created it).
case rabbit_misc:execute_mnesia_transaction(
fun () -> case mnesia:match_object(
- durable_queues, RecoveredQ, read) of
+ rabbit_durable_queue, RecoveredQ, read) of
[_] -> ok = store_queue(Q),
true;
[] -> false
@@ -144,7 +146,7 @@ recover_durable_queues() ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(durable_queues),
+ <- mnesia:table(rabbit_durable_queue),
node(Pid) == Node]))
end)),
ok.
@@ -157,7 +159,7 @@ declare(QueueName, Durable, AutoDelete, Args) ->
pid = none}),
case rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({amqqueue, QueueName}) of
+ case mnesia:wread({rabbit_queue, QueueName}) of
[] -> ok = store_queue(Q),
ok = add_default_binding(Q),
Q;
@@ -170,11 +172,11 @@ declare(QueueName, Durable, AutoDelete, Args) ->
end.
store_queue(Q = #amqqueue{durable = true}) ->
- ok = mnesia:write(durable_queues, Q, write),
- ok = mnesia:write(Q),
+ ok = mnesia:write(rabbit_durable_queue, Q, write),
+ ok = mnesia:write(rabbit_queue, Q, write),
ok;
store_queue(Q = #amqqueue{durable = false}) ->
- ok = mnesia:write(Q),
+ ok = mnesia:write(rabbit_queue, Q, write),
ok.
start_queue_process(Q) ->
@@ -188,7 +190,7 @@ add_default_binding(#amqqueue{name = QueueName}) ->
ok.
lookup(Name) ->
- rabbit_misc:dirty_read({amqqueue, Name}).
+ rabbit_misc:dirty_read({rabbit_queue, Name}).
with(Name, F, E) ->
case lookup(Name) of
@@ -205,15 +207,16 @@ with_or_die(Name, F) ->
list(VHostPath) ->
mnesia:dirty_match_object(
+ rabbit_queue,
#amqqueue{name = rabbit_misc:r(VHostPath, queue), _ = '_'}).
map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)).
info(#amqqueue{ pid = QPid }) ->
- gen_server:call(QPid, info).
+ gen_server2:pcall(QPid, 9, info, infinity).
info(#amqqueue{ pid = QPid }, Items) ->
- case gen_server:call(QPid, {info, Items}) of
+ case gen_server2:pcall(QPid, 9, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -222,82 +225,91 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
-stat(#amqqueue{pid = QPid}) -> gen_server:call(QPid, stat).
+stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity).
stat_all() ->
- lists:map(fun stat/1, rabbit_misc:dirty_read_all(amqqueue)).
+ lists:map(fun stat/1, rabbit_misc:dirty_read_all(rabbit_queue)).
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
- gen_server:call(QPid, {delete, IfUnused, IfEmpty}).
+ gen_server2:call(QPid, {delete, IfUnused, IfEmpty}, infinity).
-purge(#amqqueue{ pid = QPid }) -> gen_server:call(QPid, purge).
+purge(#amqqueue{ pid = QPid }) -> gen_server2:call(QPid, purge, infinity).
deliver(_IsMandatory, true, Txn, Message, QPid) ->
- gen_server:call(QPid, {deliver_immediately, Txn, Message});
+ gen_server2:call(QPid, {deliver_immediately, Txn, Message}, infinity);
deliver(true, _IsImmediate, Txn, Message, QPid) ->
- gen_server:call(QPid, {deliver, Txn, Message}),
+ gen_server2:call(QPid, {deliver, Txn, Message}, infinity),
true;
deliver(false, _IsImmediate, Txn, Message, QPid) ->
- gen_server:cast(QPid, {deliver, Txn, Message}),
+ gen_server2:cast(QPid, {deliver, Txn, Message}),
true.
redeliver(QPid, Messages) ->
- gen_server:cast(QPid, {redeliver, Messages}).
+ gen_server2:cast(QPid, {redeliver, Messages}).
requeue(QPid, MsgIds, ChPid) ->
- gen_server:cast(QPid, {requeue, MsgIds, ChPid}).
+ gen_server2:cast(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, Txn, MsgIds, ChPid) ->
- gen_server:cast(QPid, {ack, Txn, MsgIds, ChPid}).
+ gen_server2:cast(QPid, {ack, Txn, MsgIds, ChPid}).
commit_all(QPids, Txn) ->
- Timeout = length(QPids) * ?CALL_TIMEOUT,
safe_pmap_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server:call(QPid, {commit, Txn}, Timeout) end,
+ fun (QPid) -> gen_server2:call(QPid, {commit, Txn}, infinity) end,
QPids).
rollback_all(QPids, Txn) ->
safe_pmap_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
- fun (QPid) -> gen_server:cast(QPid, {rollback, Txn}) end,
+ fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn}) end,
QPids).
notify_down_all(QPids, ChPid) ->
- Timeout = length(QPids) * ?CALL_TIMEOUT,
safe_pmap_ok(
%% we don't care if the queue process has terminated in the
%% meantime
fun (_) -> ok end,
- fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end,
+ fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end,
QPids).
+limit_all(QPids, ChPid, LimiterPid) ->
+ safe_pmap_ok(
+ fun (_) -> ok end,
+ fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, LimiterPid}) end,
+ QPids).
+
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
- gen_server:call(QPid, {claim_queue, ReaderPid}).
+ gen_server2:call(QPid, {claim_queue, ReaderPid}, infinity).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- gen_server:call(QPid, {basic_get, ChPid, NoAck}).
+ gen_server2:call(QPid, {basic_get, ChPid, NoAck}, infinity).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid,
+basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
- ConsumerTag, ExclusiveConsume, OkMsg}).
+ gen_server2:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
+ LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg},
+ infinity).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
- ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
+ ok = gen_server2:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg},
+ infinity).
notify_sent(QPid, ChPid) ->
- gen_server:cast(QPid, {notify_sent, ChPid}).
+ gen_server2:cast(QPid, {notify_sent, ChPid}).
+
+unblock(QPid, ChPid) ->
+ gen_server2:cast(QPid, {unblock, ChPid}).
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({amqqueue, QueueName}) of
+ case mnesia:wread({rabbit_queue, QueueName}) of
[] -> {error, not_found};
[_] ->
ok = rabbit_exchange:delete_queue_bindings(QueueName),
- ok = mnesia:delete({amqqueue, QueueName}),
- ok = mnesia:delete({durable_queues, QueueName}),
+ ok = mnesia:delete({rabbit_queue, QueueName}),
+ ok = mnesia:delete({rabbit_durable_queue, QueueName}),
ok
end
end).
@@ -309,12 +321,12 @@ on_node_down(Node) ->
fun (QueueName, Acc) ->
ok = rabbit_exchange:delete_transient_queue_bindings(
QueueName),
- ok = mnesia:delete({amqqueue, QueueName}),
+ ok = mnesia:delete({rabbit_queue, QueueName}),
Acc
end,
ok,
qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid}
- <- mnesia:table(amqqueue),
+ <- mnesia:table(rabbit_queue),
node(Pid) == Node]))
end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6282a8fb..c390b2b7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--behaviour(gen_server).
+-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
-define(HIBERNATE_AFTER, 1000).
@@ -62,9 +62,10 @@
%% These are held in our process dictionary
-record(cr, {consumers,
ch_pid,
+ limiter_pid,
monitor_ref,
unacked_messages,
- is_overload_protection_active,
+ is_limit_active,
unsent_message_count}).
-define(INFO_KEYS,
@@ -85,7 +86,7 @@
%%----------------------------------------------------------------------------
start_link(Q) ->
- gen_server:start_link(?MODULE, Q, []).
+ gen_server2:start_link(?MODULE, Q, []).
%%----------------------------------------------------------------------------
@@ -131,7 +132,7 @@ ch_record(ChPid) ->
ch_pid = ChPid,
monitor_ref = MonitorRef,
unacked_messages = dict:new(),
- is_overload_protection_active = false,
+ is_limit_active = false,
unsent_message_count = 0},
put(Key, C),
C;
@@ -144,20 +145,16 @@ store_ch_record(C = #cr{ch_pid = ChPid}) ->
all_ch_record() ->
[C || {{ch, _}, C} <- get()].
-update_store_and_maybe_block_ch(
- C = #cr{is_overload_protection_active = Active,
- unsent_message_count = Count}) ->
- {Result, NewActive} =
- if
- not(Active) and (Count > ?UNSENT_MESSAGE_LIMIT) ->
- {block_ch, true};
- Active and (Count == 0) ->
- {unblock_ch, false};
- true ->
- {ok, Active}
- end,
- store_ch_record(C#cr{is_overload_protection_active = NewActive}),
- Result.
+is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
+ Limited orelse Count > ?UNSENT_MESSAGE_LIMIT.
+
+ch_record_state_transition(OldCR, NewCR) ->
+ BlockedOld = is_ch_blocked(OldCR),
+ BlockedNew = is_ch_blocked(NewCR),
+ if BlockedOld andalso not(BlockedNew) -> unblock;
+ BlockedNew andalso not(BlockedOld) -> block;
+ true -> ok
+ end.
deliver_immediately(Message, Delivered,
State = #q{q = #amqqueue{name = QName},
@@ -168,26 +165,37 @@ deliver_immediately(Message, Delivered,
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
RoundRobinTail} ->
- rabbit_channel:deliver(
- ChPid, ConsumerTag, AckRequired,
- {QName, self(), NextId, Delivered, Message}),
- C = #cr{unsent_message_count = Count,
+ C = #cr{limiter_pid = LimiterPid,
+ unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
- NewUAM = case AckRequired of
- true -> dict:store(NextId, Message, UAM);
- false -> UAM
- end,
- NewConsumers =
- case update_store_and_maybe_block_ch(
- C#cr{unsent_message_count = Count + 1,
- unacked_messages = NewUAM}) of
- ok -> queue:in(QEntry, RoundRobinTail);
- block_ch -> block_consumers(ChPid, RoundRobinTail)
- end,
- {offered, AckRequired, State#q{round_robin = NewConsumers,
- next_msg_id = NextId +1}};
+ case not(AckRequired) orelse rabbit_limiter:can_send(
+ LimiterPid, self()) of
+ true ->
+ rabbit_channel:deliver(
+ ChPid, ConsumerTag, AckRequired,
+ {QName, self(), NextId, Delivered, Message}),
+ NewUAM = case AckRequired of
+ true -> dict:store(NextId, Message, UAM);
+ false -> UAM
+ end,
+ NewC = C#cr{unsent_message_count = Count + 1,
+ unacked_messages = NewUAM},
+ store_ch_record(NewC),
+ NewConsumers =
+ case ch_record_state_transition(C, NewC) of
+ ok -> queue:in(QEntry, RoundRobinTail);
+ block -> block_consumers(ChPid, RoundRobinTail)
+ end,
+ {offered, AckRequired, State#q{round_robin = NewConsumers,
+ next_msg_id = NextId + 1}};
+ false ->
+ store_ch_record(C#cr{is_limit_active = true}),
+ NewConsumers = block_consumers(ChPid, RoundRobinTail),
+ deliver_immediately(Message, Delivered,
+ State#q{round_robin = NewConsumers})
+ end;
{empty, _} ->
- not_offered
+ {not_offered, State}
end.
attempt_delivery(none, Message, State) ->
@@ -198,8 +206,8 @@ attempt_delivery(none, Message, State) ->
persist_message(none, qname(State), Message),
persist_delivery(qname(State), Message, false),
{true, State1};
- not_offered ->
- {false, State}
+ {not_offered, State1} ->
+ {false, State1}
end;
attempt_delivery(Txn, Message, State) ->
persist_message(Txn, qname(State), Message),
@@ -237,16 +245,22 @@ block_consumer(ChPid, ConsumerTag, RoundRobin) ->
(CP /= ChPid) or (CT /= ConsumerTag)
end, queue:to_list(RoundRobin))).
-possibly_unblock(C = #cr{consumers = Consumers, ch_pid = ChPid},
- State = #q{round_robin = RoundRobin}) ->
- case update_store_and_maybe_block_ch(C) of
- ok ->
+possibly_unblock(State, ChPid, Update) ->
+ case lookup_ch(ChPid) of
+ not_found ->
State;
- unblock_ch ->
- run_poke_burst(State#q{round_robin =
- unblock_consumers(ChPid, Consumers, RoundRobin)})
+ C ->
+ NewC = Update(C),
+ store_ch_record(NewC),
+ case ch_record_state_transition(C, NewC) of
+ ok -> State;
+ unblock -> NewRR = unblock_consumers(ChPid,
+ NewC#cr.consumers,
+ State#q.round_robin),
+ run_poke_burst(State#q{round_robin = NewRR})
+ end
end.
-
+
check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) ->
{continue, State};
check_auto_delete(State = #q{has_had_consumers = false}) ->
@@ -301,7 +315,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
{stop, normal, NewState}
end
end.
-
+
cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) ->
none;
cancel_holder(_ChPid, _ConsumerTag, Holder) ->
@@ -334,8 +348,8 @@ run_poke_burst(MessageBuffer, State) ->
{offered, false, NewState} ->
persist_auto_ack(qname(State), Message),
run_poke_burst(BufferTail, NewState);
- not_offered ->
- State#q{message_buffer = MessageBuffer}
+ {not_offered, NewState} ->
+ NewState#q{message_buffer = MessageBuffer}
end;
{empty, _} ->
State#q{message_buffer = MessageBuffer}
@@ -500,8 +514,8 @@ i(messages_uncommitted, _) ->
#tx{pending_messages = Pending} <- all_tx_record()]);
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
- messages_unacknowledged,
- messages_uncommitted]]);
+ messages_unacknowledged,
+ messages_uncommitted]]);
i(acks_uncommitted, _) ->
lists:sum([length(Pending) ||
#tx{pending_acks = Pending} <- all_tx_record()]);
@@ -552,14 +566,14 @@ handle_call({deliver, Txn, Message}, _From, State) ->
handle_call({commit, Txn}, From, State) ->
ok = commit_work(Txn, qname(State)),
%% optimisation: we reply straight away so the sender can continue
- gen_server:reply(From, ok),
+ gen_server2:reply(From, ok),
NewState = process_pending(Txn, State),
erase_tx(Txn),
noreply(NewState);
handle_call({notify_down, ChPid}, From, State) ->
%% optimisation: we reply straight away so the sender can continue
- gen_server:reply(From, ok),
+ gen_server2:reply(From, ok),
handle_ch_down(ChPid, State);
handle_call({basic_get, ChPid, NoAck}, _From,
@@ -586,8 +600,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
reply(empty, State)
end;
-handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
- ExclusiveConsume, OkMsg},
+handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
+ ConsumerTag, ExclusiveConsume, OkMsg},
_From, State = #q{owner = Owner,
exclusive_consumer = ExistingHolder,
round_robin = RoundRobin}) ->
@@ -601,8 +615,13 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
ok ->
C = #cr{consumers = Consumers} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
- C1 = C#cr{consumers = [Consumer | Consumers]},
- store_ch_record(C1),
+ store_ch_record(C#cr{consumers = [Consumer | Consumers],
+ limiter_pid = LimiterPid}),
+ if Consumers == [] ->
+ ok = rabbit_limiter:register(LimiterPid, self());
+ true ->
+ ok
+ end,
State1 = State#q{has_had_consumers = true,
exclusive_consumer =
if
@@ -622,12 +641,16 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C = #cr{consumers = Consumers} ->
+ C = #cr{consumers = Consumers, limiter_pid = LimiterPid} ->
NewConsumers = lists:filter
(fun (#consumer{tag = CT}) -> CT /= ConsumerTag end,
Consumers),
- C1 = C#cr{consumers = NewConsumers},
- store_ch_record(C1),
+ store_ch_record(C#cr{consumers = NewConsumers}),
+ if NewConsumers == [] ->
+ ok = rabbit_limiter:unregister(LimiterPid, self());
+ true ->
+ ok
+ end,
ok = maybe_send_reply(ChPid, OkMsg),
case check_auto_delete(
State#q{exclusive_consumer = cancel_holder(ChPid,
@@ -730,14 +753,33 @@ handle_cast({requeue, MsgIds, ChPid}, State) ->
[{Message, true} || Message <- Messages], State))
end;
+handle_cast({unblock, ChPid}, State) ->
+ noreply(
+ possibly_unblock(State, ChPid,
+ fun (C) -> C#cr{is_limit_active = false} end));
+
handle_cast({notify_sent, ChPid}, State) ->
- case lookup_ch(ChPid) of
- not_found -> noreply(State);
- T = #cr{unsent_message_count =Count} ->
- noreply(possibly_unblock(
- T#cr{unsent_message_count = Count - 1},
- State))
- end.
+ noreply(
+ possibly_unblock(State, ChPid,
+ fun (C = #cr{unsent_message_count = Count}) ->
+ C#cr{unsent_message_count = Count - 1}
+ end));
+
+handle_cast({limit, ChPid, LimiterPid}, State) ->
+ noreply(
+ possibly_unblock(
+ State, ChPid,
+ fun (C = #cr{consumers = Consumers,
+ limiter_pid = OldLimiterPid,
+ is_limit_active = Limited}) ->
+ if Consumers =/= [] andalso OldLimiterPid == undefined ->
+ ok = rabbit_limiter:register(LimiterPid, self());
+ true ->
+ ok
+ end,
+ NewLimited = Limited andalso LimiterPid =/= undefined,
+ C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
+ end)).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
@@ -758,7 +800,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_info(timeout, State) ->
%% TODO: Once we drop support for R11B-5, we can change this to
%% {noreply, State, hibernate};
- proc_lib:hibernate(gen_server, enter_loop, [?MODULE, [], State]);
+ proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]);
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5fd9a512..b2716ec4 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -33,23 +33,29 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/4, do/2, do/3, shutdown/1]).
+-behaviour(gen_server2).
+
+-export([start_link/5, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2]).
-%% callbacks
--export([init/2, handle_message/2]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
--record(ch, {state, proxy_pid, reader_pid, writer_pid,
+-record(ch, {state, channel, reader_pid, writer_pid, limiter_pid,
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host,
most_recently_declared_queue, consumer_mapping}).
+-define(HIBERNATE_AFTER, 1000).
+
+-define(MAX_PERMISSION_CACHE_SIZE, 12).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_link/4 :: (pid(), pid(), username(), vhost()) -> pid()).
+-spec(start_link/5 ::
+ (channel_number(), pid(), pid(), username(), vhost()) -> pid()).
-spec(do/2 :: (pid(), amqp_method()) -> 'ok').
-spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
@@ -61,112 +67,126 @@
%%----------------------------------------------------------------------------
-start_link(ReaderPid, WriterPid, Username, VHost) ->
- buffering_proxy:start_link(?MODULE, [ReaderPid, WriterPid,
- Username, VHost]).
+start_link(Channel, ReaderPid, WriterPid, Username, VHost) ->
+ {ok, Pid} = gen_server2:start_link(
+ ?MODULE, [Channel, ReaderPid, WriterPid,
+ Username, VHost], []),
+ Pid.
do(Pid, Method) ->
do(Pid, Method, none).
do(Pid, Method, Content) ->
- Pid ! {method, Method, Content},
- ok.
+ gen_server2:cast(Pid, {method, Method, Content}).
shutdown(Pid) ->
- Pid ! terminate,
- ok.
+ gen_server2:cast(Pid, terminate).
send_command(Pid, Msg) ->
- Pid ! {command, Msg},
- ok.
+ gen_server2:cast(Pid, {command, Msg}).
deliver(Pid, ConsumerTag, AckRequired, Msg) ->
- Pid ! {deliver, ConsumerTag, AckRequired, Msg},
- ok.
+ gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
conserve_memory(Pid, Conserve) ->
- Pid ! {conserve_memory, Conserve},
- ok.
+ gen_server2:cast(Pid, {conserve_memory, Conserve}).
%%---------------------------------------------------------------------------
-init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
+init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
process_flag(trap_exit, true),
link(WriterPid),
- %% this is bypassing the proxy so alarms can "jump the queue" and
- %% be handled promptly
rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
- #ch{state = starting,
- proxy_pid = ProxyPid,
- reader_pid = ReaderPid,
- writer_pid = WriterPid,
- transaction_id = none,
- tx_participants = sets:new(),
- next_tag = 1,
- uncommitted_ack_q = queue:new(),
- unacked_message_q = queue:new(),
- username = Username,
- virtual_host = VHost,
- most_recently_declared_queue = <<>>,
- consumer_mapping = dict:new()}.
-
-handle_message({method, Method, Content}, State) ->
+ {ok, #ch{state = starting,
+ channel = Channel,
+ reader_pid = ReaderPid,
+ writer_pid = WriterPid,
+ limiter_pid = undefined,
+ transaction_id = none,
+ tx_participants = sets:new(),
+ next_tag = 1,
+ uncommitted_ack_q = queue:new(),
+ unacked_message_q = queue:new(),
+ username = Username,
+ virtual_host = VHost,
+ most_recently_declared_queue = <<>>,
+ consumer_mapping = dict:new()}}.
+
+handle_call(_Request, _From, State) ->
+ noreply(State).
+
+handle_cast({method, Method, Content}, State) ->
try handle_method(Method, Content, State) of
{reply, Reply, NewState} ->
ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
- NewState;
+ noreply(NewState);
{noreply, NewState} ->
- NewState;
+ noreply(NewState);
stop ->
- exit(normal)
+ {stop, normal, State#ch{state = terminating}}
catch
exit:{amqp, Error, Explanation, none} ->
- terminate({amqp, Error, Explanation,
- rabbit_misc:method_record_type(Method)},
- State);
+ ok = notify_queues(internal_rollback(State)),
+ Reason = {amqp, Error, Explanation,
+ rabbit_misc:method_record_type(Method)},
+ State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
+ {stop, normal, State#ch{state = terminating}};
exit:normal ->
- terminate(normal, State);
+ {stop, normal, State};
_:Reason ->
- terminate({Reason, erlang:get_stacktrace()}, State)
+ {stop, {Reason, erlang:get_stacktrace()}, State}
end;
-handle_message(terminate, State) ->
- terminate(normal, State);
+handle_cast(terminate, State) ->
+ {stop, normal, State};
-handle_message({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
+handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
- State;
+ noreply(State);
-handle_message({deliver, ConsumerTag, AckRequired, Msg},
- State = #ch{proxy_pid = ProxyPid,
- writer_pid = WriterPid,
- next_tag = DeliveryTag}) ->
+handle_cast({deliver, ConsumerTag, AckRequired, Msg},
+ State = #ch{writer_pid = WriterPid,
+ next_tag = DeliveryTag}) ->
State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State),
- ok = internal_deliver(WriterPid, ProxyPid,
- true, ConsumerTag, DeliveryTag, Msg),
- State1#ch{next_tag = DeliveryTag + 1};
+ ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
+ noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_message({conserve_memory, Conserve}, State) ->
+handle_cast({conserve_memory, Conserve}, State) ->
+ ok = clear_permission_cache(),
ok = rabbit_writer:send_command(
State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
- State;
+ noreply(State).
-handle_message({'EXIT', _Pid, Reason}, State) ->
- terminate(Reason, State);
+handle_info({'EXIT', _Pid, Reason}, State) ->
+ {stop, Reason, State};
-handle_message(Other, State) ->
- terminate({unexpected_channel_message, Other}, State).
+handle_info(timeout, State) ->
+ ok = clear_permission_cache(),
+ %% TODO: Once we drop support for R11B-5, we can change this to
+ %% {noreply, State, hibernate};
+ proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]).
-%%---------------------------------------------------------------------------
+terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid,
+ state = terminating}) ->
+ rabbit_writer:shutdown(WriterPid),
+ rabbit_limiter:shutdown(LimiterPid);
-terminate(Reason, State = #ch{writer_pid = WriterPid}) ->
+terminate(Reason, State = #ch{writer_pid = WriterPid,
+ limiter_pid = LimiterPid}) ->
Res = notify_queues(internal_rollback(State)),
case Reason of
normal -> ok = Res;
_ -> ok
end,
rabbit_writer:shutdown(WriterPid),
- exit(Reason).
+ rabbit_limiter:shutdown(LimiterPid).
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%---------------------------------------------------------------------------
+
+noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -190,6 +210,35 @@ return_queue_declare_ok(State, NoWait, Q) ->
{reply, Reply, NewState}
end.
+check_resource_access(Username, Resource, Perm) ->
+ V = {Resource, Perm},
+ Cache = case get(permission_cache) of
+ undefined -> [];
+ Other -> Other
+ end,
+ CacheTail =
+ case lists:member(V, Cache) of
+ true -> lists:delete(V, Cache);
+ false -> ok = rabbit_access_control:check_resource_access(
+ Username, Resource, Perm),
+ lists:sublist(Cache, ?MAX_PERMISSION_CACHE_SIZE - 1)
+ end,
+ put(permission_cache, [V | CacheTail]),
+ ok.
+
+clear_permission_cache() ->
+ erase(permission_cache),
+ ok.
+
+check_configure_permitted(Resource, #ch{ username = Username}) ->
+ check_resource_access(Username, Resource, #permission.configure).
+
+check_write_permitted(Resource, #ch{ username = Username}) ->
+ check_resource_access(Username, Resource, #permission.write).
+
+check_read_permitted(Resource, #ch{ username = Username}) ->
+ check_resource_access(Username, Resource, #permission.read).
+
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
not_allowed, "no previously declared queue", []);
@@ -248,7 +297,6 @@ handle_method(_Method, _, #ch{state = starting}) ->
handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
ok = notify_queues(internal_rollback(State)),
ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
- ok = rabbit_writer:shutdown(WriterPid),
stop;
handle_method(#'access.request'{},_, State) ->
@@ -260,6 +308,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
immediate = Immediate},
Content, State = #ch{ virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
@@ -273,7 +322,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
content = DecodedContent,
persistent_key = PersistentKey},
- rabbit_exchange:route(Exchange, RoutingKey), State)};
+ rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
@@ -286,9 +335,10 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
true -> ok
end,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
- Participants = ack(State#ch.proxy_pid, TxnKey, Acked),
+ Participants = ack(TxnKey, Acked),
{noreply, case TxnKey of
- none -> State#ch{unacked_message_q = Remaining};
+ none -> ok = notify_limiter(State#ch.limiter_pid, Acked),
+ State#ch{unacked_message_q = Remaining};
_ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q,
Acked),
add_tx_participants(
@@ -299,12 +349,13 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
- _, State = #ch{ proxy_pid = ProxyPid, writer_pid = WriterPid,
+ _, State = #ch{ writer_pid = WriterPid,
next_tag = DeliveryTag }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_or_die(
QueueName,
- fun (Q) -> rabbit_amqqueue:basic_get(Q, ProxyPid, NoAck) end) of
+ fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
Msg = {_QName, _QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
@@ -330,12 +381,13 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait},
- _, State = #ch{ proxy_pid = ProxyPid,
- reader_pid = ReaderPid,
+ _, State = #ch{ reader_pid = ReaderPid,
+ limiter_pid = LimiterPid,
consumer_mapping = ConsumerMapping }) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ check_read_permitted(QueueName, State),
ActualConsumerTag =
case ConsumerTag of
<<>> -> rabbit_guid:binstring_guid("amq.ctag");
@@ -349,7 +401,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
QueueName,
fun (Q) ->
rabbit_amqqueue:basic_consume(
- Q, NoAck, ReaderPid, ProxyPid,
+ Q, NoAck, ReaderPid, self(), LimiterPid,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag}))
@@ -380,8 +432,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
nowait = NoWait},
- _, State = #ch{ proxy_pid = ProxyPid,
- consumer_mapping = ConsumerMapping }) ->
+ _, State = #ch{consumer_mapping = ConsumerMapping }) ->
OkMsg = #'basic.cancel_ok'{consumer_tag = ConsumerTag},
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
@@ -402,7 +453,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
%% cancel_ok ourselves it might overtake a
%% message sent previously by the queue.
rabbit_amqqueue:basic_cancel(
- Q, ProxyPid, ConsumerTag,
+ Q, self(), ConsumerTag,
ok_msg(NoWait, #'basic.cancel_ok'{
consumer_tag = ConsumerTag}))
end) of
@@ -414,13 +465,34 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
end
end;
-handle_method(#'basic.qos'{}, _, State) ->
- %% FIXME: Need to implement QOS
- {reply, #'basic.qos_ok'{}, State};
+handle_method(#'basic.qos'{global = true}, _, _State) ->
+ rabbit_misc:protocol_error(not_implemented, "global=true", []);
+
+handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
+ rabbit_misc:protocol_error(not_implemented,
+ "prefetch_size!=0 (~w)", [Size]);
+
+handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
+ _, State = #ch{ limiter_pid = LimiterPid }) ->
+ NewLimiterPid = case {LimiterPid, PrefetchCount} of
+ {undefined, 0} ->
+ undefined;
+ {undefined, _} ->
+ LPid = rabbit_limiter:start_link(self()),
+ ok = limit_queues(LPid, State),
+ LPid;
+ {_, 0} ->
+ ok = rabbit_limiter:shutdown(LimiterPid),
+ ok = limit_queues(undefined, State),
+ undefined;
+ {_, _} ->
+ LimiterPid
+ end,
+ ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount),
+ {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}};
handle_method(#'basic.recover'{requeue = true},
_, State = #ch{ transaction_id = none,
- proxy_pid = ProxyPid,
unacked_message_q = UAMQ }) ->
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
@@ -429,14 +501,13 @@ handle_method(#'basic.recover'{requeue = true},
%% order. To keep it happy we reverse the id list
%% since we are given them in reverse order.
rabbit_amqqueue:requeue(
- QPid, lists:reverse(MsgIds), ProxyPid)
+ QPid, lists:reverse(MsgIds), self())
end, ok, UAMQ),
%% No answer required, apparently!
{noreply, State#ch{unacked_message_q = queue:new()}};
handle_method(#'basic.recover'{requeue = false},
_, State = #ch{ transaction_id = none,
- proxy_pid = ProxyPid,
writer_pid = WriterPid,
unacked_message_q = UAMQ }) ->
lists:foreach(
@@ -454,8 +525,7 @@ handle_method(#'basic.recover'{requeue = false},
%%
%% FIXME: should we allocate a fresh DeliveryTag?
ok = internal_deliver(
- WriterPid, ProxyPid,
- false, ConsumerTag, DeliveryTag,
+ WriterPid, false, ConsumerTag, DeliveryTag,
{QName, QPid, MsgId, true, Message})
end, queue:to_list(UAMQ)),
%% No answer required, apparently!
@@ -476,6 +546,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
_, State = #ch{ virtual_host = VHostPath }) ->
CheckedType = rabbit_exchange:check_type(TypeNameBin),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_configure_permitted(ExchangeName, State),
X = case rabbit_exchange:lookup(ExchangeName) of
{ok, FoundX} -> FoundX;
{error, not_found} ->
@@ -495,6 +566,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
nowait = NoWait},
_, State = #ch{ virtual_host = VHostPath }) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_configure_permitted(ExchangeName, State),
X = rabbit_exchange:lookup_or_die(ExchangeName),
ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)),
return_ok(State, NoWait, #'exchange.declare_ok'{});
@@ -504,6 +576,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
nowait = NoWait},
_, State = #ch { virtual_host = VHostPath }) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_configure_permitted(ExchangeName, State),
case rabbit_exchange:delete(ExchangeName, IfUnused) of
{error, not_found} ->
rabbit_misc:protocol_error(
@@ -554,9 +627,12 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
+ check_configure_permitted(QueueName, State),
Finish(rabbit_amqqueue:declare(QueueName,
Durable, AutoDelete, Args));
- Other -> Other
+ Other = #amqqueue{name = QueueName} ->
+ check_configure_permitted(QueueName, State),
+ Other
end,
return_queue_declare_ok(State, NoWait, Q);
@@ -565,6 +641,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
nowait = NoWait},
_, State = #ch{ virtual_host = VHostPath }) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
+ check_configure_permitted(QueueName, State),
Q = rabbit_amqqueue:with_or_die(QueueName, fun (Q) -> Q end),
return_queue_declare_ok(State, NoWait, Q);
@@ -575,6 +652,7 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
},
_, State) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ check_configure_permitted(QueueName, State),
case rabbit_amqqueue:with_or_die(
QueueName,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
@@ -611,6 +689,7 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
nowait = NoWait},
_, State) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ check_read_permitted(QueueName, State),
{ok, PurgedMessageCount} = rabbit_amqqueue:with_or_die(
QueueName,
fun (Q) -> rabbit_amqqueue:purge(Q) end),
@@ -660,9 +739,11 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
%% FIXME: don't allow binding to internal exchanges -
%% including the one named "" !
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
+ check_write_permitted(QueueName, State),
ActualRoutingKey = expand_routing_key_shortcut(QueueNameBin, RoutingKey,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
+ check_read_permitted(ExchangeName, State),
case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
{error, exchange_not_found} ->
rabbit_misc:protocol_error(
@@ -748,10 +829,10 @@ add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) ->
State#ch{tx_participants = sets:union(Participants,
sets:from_list(MoreP))}.
-ack(ProxyPid, TxnKey, UAQ) ->
+ack(TxnKey, UAQ) ->
fold_per_queue(
fun (QPid, MsgIds, L) ->
- ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, ProxyPid),
+ ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()),
[QPid | L]
end, [], UAQ).
@@ -766,7 +847,9 @@ internal_commit(State = #ch{transaction_id = TxnKey,
tx_participants = Participants}) ->
case rabbit_amqqueue:commit_all(sets:to_list(Participants),
TxnKey) of
- ok -> new_tx(State);
+ ok -> ok = notify_limiter(State#ch.limiter_pid,
+ State#ch.uncommitted_ack_q),
+ new_tx(State);
{error, Errors} -> rabbit_misc:protocol_error(
internal_error, "commit failed: ~w", [Errors])
end.
@@ -803,19 +886,37 @@ fold_per_queue(F, Acc0, UAQ) ->
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
-notify_queues(#ch{proxy_pid = ProxyPid, consumer_mapping = Consumers}) ->
- rabbit_amqqueue:notify_down_all(
- [QPid || QueueName <-
- sets:to_list(
- dict:fold(fun (_ConsumerTag, QueueName, S) ->
- sets:add_element(QueueName, S)
- end, sets:new(), Consumers)),
- case rabbit_amqqueue:lookup(QueueName) of
- {ok, Q} -> QPid = Q#amqqueue.pid, true;
- %% queue has been deleted in the meantime
- {error, not_found} -> QPid = none, false
- end],
- ProxyPid).
+notify_queues(#ch{consumer_mapping = Consumers}) ->
+ rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()).
+
+limit_queues(LPid, #ch{consumer_mapping = Consumers}) ->
+ rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid).
+
+consumer_queues(Consumers) ->
+ [QPid || QueueName <-
+ sets:to_list(
+ dict:fold(fun (_ConsumerTag, QueueName, S) ->
+ sets:add_element(QueueName, S)
+ end, sets:new(), Consumers)),
+ case rabbit_amqqueue:lookup(QueueName) of
+ {ok, Q} -> QPid = Q#amqqueue.pid, true;
+ %% queue has been deleted in the meantime
+ {error, not_found} -> QPid = none, false
+ end].
+
+%% tell the limiter about the number of acks that have been received
+%% for messages delivered to subscribed consumers, but not acks for
+%% messages sent in a response to a basic.get (identified by their
+%% 'none' consumer tag)
+notify_limiter(undefined, _Acked) ->
+ ok;
+notify_limiter(LimiterPid, Acked) ->
+ case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
+ ({_, _, _}, Acc) -> Acc + 1
+ end, 0, queue:to_list(Acked)) of
+ 0 -> ok;
+ Count -> rabbit_limiter:ack(LimiterPid, Count)
+ end.
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
@@ -823,7 +924,8 @@ is_message_persistent(#content{properties = #'P_basic'{
1 -> false;
2 -> true;
undefined -> false;
- Other -> rabbit_log:warning("Unknown delivery mode ~p - treating as 1, non-persistent~n",
+ Other -> rabbit_log:warning("Unknown delivery mode ~p - "
+ "treating as 1, non-persistent~n",
[Other]),
false
end.
@@ -833,7 +935,7 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
-internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag,
+internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
{_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
@@ -845,6 +947,6 @@ internal_deliver(WriterPid, ChPid, Notify, ConsumerTag, DeliveryTag,
routing_key = RoutingKey},
ok = case Notify of
true -> rabbit_writer:send_command_and_notify(
- WriterPid, QPid, ChPid, M, Content);
+ WriterPid, QPid, self(), M, Content);
false -> rabbit_writer:send_command(WriterPid, M, Content)
end.
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 352d7e75..6649899a 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -127,10 +127,10 @@ Available commands:
delete_vhost <VHostPath>
list_vhosts
- map_user_vhost <UserName> <VHostPath>
- unmap_user_vhost <UserName> <VHostPath>
- list_user_vhosts <UserName>
- list_vhost_users <VHostPath>
+ set_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp>
+ clear_permissions [-p <VHostPath>] <UserName>
+ list_permissions [-p <VHostPath>]
+ list_user_permissions <UserName>
list_queues [-p <VHostPath>] [<QueueInfoItem> ...]
list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...]
@@ -236,25 +236,14 @@ action(list_vhosts, Node, [], Inform) ->
Inform("Listing vhosts", []),
display_list(call(Node, {rabbit_access_control, list_vhosts, []}));
-action(map_user_vhost, Node, Args = [_Username, _VHostPath], Inform) ->
- Inform("Mapping user ~p to vhost ~p", Args),
- call(Node, {rabbit_access_control, map_user_vhost, Args});
-
-action(unmap_user_vhost, Node, Args = [_Username, _VHostPath], Inform) ->
- Inform("Unmapping user ~p from vhost ~p", Args),
- call(Node, {rabbit_access_control, unmap_user_vhost, Args});
-
-action(list_user_vhosts, Node, Args = [_Username], Inform) ->
- Inform("Listing vhosts for user ~p", Args),
- display_list(call(Node, {rabbit_access_control, list_user_vhosts, Args}));
-
-action(list_vhost_users, Node, Args = [_VHostPath], Inform) ->
- Inform("Listing users for vhosts ~p", Args),
- display_list(call(Node, {rabbit_access_control, list_vhost_users, Args}));
+action(list_user_permissions, Node, Args = [_Username], Inform) ->
+ Inform("Listing permissions for user ~p", Args),
+ display_list(call(Node, {rabbit_access_control, list_user_permissions,
+ Args}));
action(list_queues, Node, Args, Inform) ->
Inform("Listing queues", []),
- {VHostArg, RemainingArgs} = parse_vhost_flag(Args),
+ {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args),
ArgAtoms = list_replace(node, pid,
default_if_empty(RemainingArgs, [name, messages])),
display_info_list(rpc_call(Node, rabbit_amqqueue, info_all,
@@ -263,7 +252,7 @@ action(list_queues, Node, Args, Inform) ->
action(list_exchanges, Node, Args, Inform) ->
Inform("Listing exchanges", []),
- {VHostArg, RemainingArgs} = parse_vhost_flag(Args),
+ {VHostArg, RemainingArgs} = parse_vhost_flag_bin(Args),
ArgAtoms = default_if_empty(RemainingArgs, [name, type]),
display_info_list(rpc_call(Node, rabbit_exchange, info_all,
[VHostArg, ArgAtoms]),
@@ -271,7 +260,7 @@ action(list_exchanges, Node, Args, Inform) ->
action(list_bindings, Node, Args, Inform) ->
Inform("Listing bindings", []),
- {VHostArg, _} = parse_vhost_flag(Args),
+ {VHostArg, _} = parse_vhost_flag_bin(Args),
InfoKeys = [exchange_name, routing_key, queue_name, args],
display_info_list(
[lists:zip(InfoKeys, tuple_to_list(X)) ||
@@ -285,15 +274,37 @@ action(list_connections, Node, Args, Inform) ->
default_if_empty(Args, [user, peer_address, peer_port])),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
- ArgAtoms).
+ ArgAtoms);
+
+action(Command, Node, Args, Inform) ->
+ {VHost, RemainingArgs} = parse_vhost_flag(Args),
+ action(Command, Node, VHost, RemainingArgs, Inform).
+
+action(set_permissions, Node, VHost, [Username, CPerm, WPerm, RPerm], Inform) ->
+ Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]),
+ call(Node, {rabbit_access_control, set_permissions,
+ [Username, VHost, CPerm, WPerm, RPerm]});
+
+action(clear_permissions, Node, VHost, [Username], Inform) ->
+ Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]),
+ call(Node, {rabbit_access_control, clear_permissions, [Username, VHost]});
+
+action(list_permissions, Node, VHost, [], Inform) ->
+ Inform("Listing permissions in vhost ~p", [VHost]),
+ display_list(call(Node, {rabbit_access_control, list_vhost_permissions,
+ [VHost]})).
parse_vhost_flag(Args) when is_list(Args) ->
- case Args of
- ["-p", VHost | RemainingArgs] ->
- {list_to_binary(VHost), RemainingArgs};
- RemainingArgs ->
- {<<"/">>, RemainingArgs}
- end.
+ case Args of
+ ["-p", VHost | RemainingArgs] ->
+ {VHost, RemainingArgs};
+ RemainingArgs ->
+ {"/", RemainingArgs}
+ end.
+
+parse_vhost_flag_bin(Args) ->
+ {VHost, RemainingArgs} = parse_vhost_flag(Args),
+ {list_to_binary(VHost), RemainingArgs}.
default_if_empty(List, Default) when is_list(List) ->
if List == [] ->
@@ -303,21 +314,17 @@ default_if_empty(List, Default) when is_list(List) ->
end.
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
- lists:foreach(
- fun (Result) ->
- io:fwrite(
- lists:flatten(
- rabbit_misc:intersperse(
- "\t",
- [format_info_item(Result, X) || X <- InfoItemKeys]))),
- io:nl()
- end,
- Results),
+ lists:foreach(fun (Result) -> display_row([format_info_item(Result, X) ||
+ X <- InfoItemKeys])
+ end, Results),
ok;
-
display_info_list(Other, _) ->
Other.
+display_row(Row) ->
+ io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))),
+ io:nl().
+
format_info_item(Items, Key) ->
{value, Info = {Key, Value}} = lists:keysearch(Key, 1, Items),
case Info of
@@ -334,8 +341,10 @@ format_info_item(Items, Key) ->
end.
display_list(L) when is_list(L) ->
- lists:foreach(fun (I) ->
- io:format("~s~n", [binary_to_list(I)])
+ lists:foreach(fun (I) when is_binary(I) ->
+ io:format("~s~n", [url_encode(I)]);
+ (I) when is_tuple(I) ->
+ display_row([url_encode(V) || V <- tuple_to_list(I)])
end,
lists:sort(L)),
ok;
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 9a9220b5..183b6984 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -46,7 +46,7 @@ init({{File, Suffix}, []}) ->
case rabbit_misc:append_file(File, Suffix) of
ok -> ok;
{error, Error} ->
- rabbit_log:error("Failed to append contents of " ++
+ rabbit_log:error("Failed to append contents of "
"log file '~s' to '~s':~n~p~n",
[File, [File, Suffix], Error])
end,
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 7f3a78e9..fc89cfca 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -37,11 +37,11 @@
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
list/1, info/1, info/2, info_all/1, info_all/2,
simple_publish/6, simple_publish/3,
- route/2]).
+ route/3]).
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
--export([check_type/1, assert_type/2, topic_matches/2]).
+-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]).
%% EXTENDED API
-export([list_exchange_bindings/1]).
@@ -79,7 +79,7 @@
(bool(), bool(), exchange_name(), routing_key(), binary(), binary()) ->
publish_res()).
-spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()).
--spec(route/2 :: (exchange(), routing_key()) -> [pid()]).
+-spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]).
-spec(add_binding/4 ::
(exchange_name(), queue_name(), routing_key(), amqp_table()) ->
bind_res() | {'error', 'durability_settings_incompatible'}).
@@ -91,6 +91,7 @@
-spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok').
-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok').
-spec(topic_matches/2 :: (binary(), binary()) -> bool()).
+-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()).
-spec(delete/2 :: (exchange_name(), bool()) ->
'ok' | not_found() | {'error', 'in_use'}).
-spec(list_queue_bindings/1 :: (queue_name()) ->
@@ -106,14 +107,16 @@
recover() ->
ok = rabbit_misc:table_foreach(
- fun(Exchange) -> ok = mnesia:write(Exchange) end,
- durable_exchanges),
+ fun(Exchange) -> ok = mnesia:write(rabbit_exchange,
+ Exchange, write)
+ end, rabbit_durable_exchange),
ok = rabbit_misc:table_foreach(
fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route),
- ok = mnesia:write(Route),
- ok = mnesia:write(ReverseRoute)
- end, durable_routes),
- ok.
+ ok = mnesia:write(rabbit_route,
+ Route, write),
+ ok = mnesia:write(rabbit_reverse_route,
+ ReverseRoute, write)
+ end, rabbit_durable_route).
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange = #exchange{name = ExchangeName,
@@ -123,11 +126,11 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
arguments = Args},
rabbit_misc:execute_mnesia_transaction(
fun () ->
- case mnesia:wread({exchange, ExchangeName}) of
- [] -> ok = mnesia:write(Exchange),
+ case mnesia:wread({rabbit_exchange, ExchangeName}) of
+ [] -> ok = mnesia:write(rabbit_exchange, Exchange, write),
if Durable ->
- ok = mnesia:write(
- durable_exchanges, Exchange, write);
+ ok = mnesia:write(rabbit_durable_exchange,
+ Exchange, write);
true -> ok
end,
Exchange;
@@ -141,6 +144,8 @@ check_type(<<"direct">>) ->
direct;
check_type(<<"topic">>) ->
topic;
+check_type(<<"headers">>) ->
+ headers;
check_type(T) ->
rabbit_misc:protocol_error(
command_invalid, "invalid exchange type '~s'", [T]).
@@ -154,7 +159,7 @@ assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) ->
[rabbit_misc:rs(Name), ActualType, RequiredType]).
lookup(Name) ->
- rabbit_misc:dirty_read({exchange, Name}).
+ rabbit_misc:dirty_read({rabbit_exchange, Name}).
lookup_or_die(Name) ->
case lookup(Name) of
@@ -166,6 +171,7 @@ lookup_or_die(Name) ->
list(VHostPath) ->
mnesia:dirty_match_object(
+ rabbit_exchange,
#exchange{name = rabbit_misc:r(VHostPath, exchange), _ = '_'}).
map(VHostPath, F) ->
@@ -207,64 +213,80 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin,
%% Usable by Erlang code that wants to publish messages.
simple_publish(Mandatory, Immediate,
Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey}) ->
+ routing_key = RoutingKey,
+ content = Content}) ->
case lookup(ExchangeName) of
{ok, Exchange} ->
- QPids = route(Exchange, RoutingKey),
+ QPids = route(Exchange, RoutingKey, Content),
rabbit_router:deliver(QPids, Mandatory, Immediate,
none, Message);
{error, Error} -> {error, Error}
end.
+sort_arguments(Arguments) ->
+ lists:keysort(1, Arguments).
+
%% return the list of qpids to which a message with a given routing
%% key, sent to a particular exchange, should be delivered.
%%
%% The function ensures that a qpid appears in the return list exactly
%% as many times as a message should be delivered to it. With the
%% current exchange types that is at most once.
-%%
+route(X = #exchange{type = topic}, RoutingKey, _Content) ->
+ match_bindings(X, fun (#binding{key = BindingKey}) ->
+ topic_matches(BindingKey, RoutingKey)
+ end);
+
+route(X = #exchange{type = headers}, _RoutingKey, Content) ->
+ Headers = case (Content#content.properties)#'P_basic'.headers of
+ undefined -> [];
+ H -> sort_arguments(H)
+ end,
+ match_bindings(X, fun (#binding{args = Spec}) ->
+ headers_match(Spec, Headers)
+ end);
+
+route(X = #exchange{type = fanout}, _RoutingKey, _Content) ->
+ match_routing_key(X, '_');
+
+route(X = #exchange{type = direct}, RoutingKey, _Content) ->
+ match_routing_key(X, RoutingKey).
+
%% TODO: Maybe this should be handled by a cursor instead.
-route(#exchange{name = Name, type = topic}, RoutingKey) ->
- Query = qlc:q([QName ||
- #route{binding = #binding{
- exchange_name = ExchangeName,
- queue_name = QName,
- key = BindingKey}} <- mnesia:table(route),
- ExchangeName == Name,
- %% TODO: This causes a full scan for each entry
- %% with the same exchange (see bug 19336)
- topic_matches(BindingKey, RoutingKey)]),
+%% TODO: This causes a full scan for each entry with the same exchange
+match_bindings(#exchange{name = Name}, Match) ->
+ Query = qlc:q([QName || #route{binding = Binding = #binding{
+ exchange_name = ExchangeName,
+ queue_name = QName}} <-
+ mnesia:table(rabbit_route),
+ ExchangeName == Name,
+ Match(Binding)]),
lookup_qpids(
try
mnesia:async_dirty(fun qlc:e/1, [Query])
catch exit:{aborted, {badarg, _}} ->
%% work around OTP-7025, which was fixed in R12B-1, by
%% falling back on a less efficient method
- [QName || #route{binding = #binding{queue_name = QName,
- key = BindingKey}} <-
+ [QName || #route{binding = Binding = #binding{
+ queue_name = QName}} <-
mnesia:dirty_match_object(
+ rabbit_route,
#route{binding = #binding{exchange_name = Name,
_ = '_'}}),
- topic_matches(BindingKey, RoutingKey)]
- end);
-
-route(X = #exchange{type = fanout}, _) ->
- route_internal(X, '_');
-
-route(X = #exchange{type = direct}, RoutingKey) ->
- route_internal(X, RoutingKey).
+ Match(Binding)]
+ end).
-route_internal(#exchange{name = Name}, RoutingKey) ->
+match_routing_key(#exchange{name = Name}, RoutingKey) ->
MatchHead = #route{binding = #binding{exchange_name = Name,
queue_name = '$1',
key = RoutingKey,
_ = '_'}},
- lookup_qpids(mnesia:dirty_select(route, [{MatchHead, [], ['$1']}])).
+ lookup_qpids(mnesia:dirty_select(rabbit_route, [{MatchHead, [], ['$1']}])).
lookup_qpids(Queues) ->
sets:fold(
fun(Key, Acc) ->
- case mnesia:dirty_read({amqqueue, Key}) of
+ case mnesia:dirty_read({rabbit_queue, Key}) of
[#amqqueue{pid = QPid}] -> [QPid | Acc];
[] -> Acc
end
@@ -275,10 +297,16 @@ lookup_qpids(Queues) ->
%% to be implemented for 0.91 ?
delete_exchange_bindings(ExchangeName) ->
- indexed_delete(
- #route{binding = #binding{exchange_name = ExchangeName,
- _ = '_'}},
- fun delete_forward_routes/1, fun mnesia:delete_object/1).
+ [begin
+ ok = mnesia:delete_object(rabbit_reverse_route,
+ reverse_route(Route), write),
+ ok = delete_forward_routes(Route)
+ end || Route <- mnesia:match_object(
+ rabbit_route,
+ #route{binding = #binding{exchange_name = ExchangeName,
+ _ = '_'}},
+ write)],
+ ok.
delete_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, fun delete_forward_routes/1).
@@ -288,29 +316,27 @@ delete_transient_queue_bindings(QueueName) ->
delete_queue_bindings(QueueName, FwdDeleteFun) ->
Exchanges = exchanges_for_queue(QueueName),
- indexed_delete(
- reverse_route(#route{binding = #binding{queue_name = QueueName,
- _ = '_'}}),
- fun mnesia:delete_object/1, FwdDeleteFun),
[begin
- [X] = mnesia:read({exchange, ExchangeName}),
+ ok = FwdDeleteFun(reverse_route(Route)),
+ ok = mnesia:delete_object(rabbit_reverse_route, Route, write)
+ end || Route <- mnesia:match_object(
+ rabbit_reverse_route,
+ reverse_route(
+ #route{binding = #binding{queue_name = QueueName,
+ _ = '_'}}),
+ write)],
+ [begin
+ [X] = mnesia:read({rabbit_exchange, ExchangeName}),
ok = maybe_auto_delete(X)
end || ExchangeName <- Exchanges],
ok.
-indexed_delete(Match, ForwardsDeleteFun, ReverseDeleteFun) ->
- [begin
- ok = ReverseDeleteFun(reverse_route(Route)),
- ok = ForwardsDeleteFun(Route)
- end || Route <- mnesia:match_object(Match)],
- ok.
-
delete_forward_routes(Route) ->
- ok = mnesia:delete_object(Route),
- ok = mnesia:delete_object(durable_routes, Route, write).
+ ok = mnesia:delete_object(rabbit_route, Route, write),
+ ok = mnesia:delete_object(rabbit_durable_route, Route, write).
delete_transient_forward_routes(Route) ->
- ok = mnesia:delete_object(Route).
+ ok = mnesia:delete_object(rabbit_route, Route, write).
exchanges_for_queue(QueueName) ->
MatchHead = reverse_route(
@@ -319,7 +345,7 @@ exchanges_for_queue(QueueName) ->
_ = '_'}}),
sets:to_list(
sets:from_list(
- mnesia:select(reverse_route, [{MatchHead, [], ['$1']}]))).
+ mnesia:select(rabbit_reverse_route, [{MatchHead, [], ['$1']}]))).
contains(Table, MatchHead) ->
try
@@ -339,7 +365,7 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
call_with_exchange(Exchange, Fun) ->
rabbit_misc:execute_mnesia_transaction(
- fun() -> case mnesia:read({exchange, Exchange}) of
+ fun() -> case mnesia:read({rabbit_exchange, Exchange}) of
[] -> {error, not_found};
[X] -> Fun(X)
end
@@ -347,8 +373,8 @@ call_with_exchange(Exchange, Fun) ->
call_with_exchange_and_queue(Exchange, Queue, Fun) ->
rabbit_misc:execute_mnesia_transaction(
- fun() -> case {mnesia:read({exchange, Exchange}),
- mnesia:read({amqqueue, Queue})} of
+ fun() -> case {mnesia:read({rabbit_exchange, Exchange}),
+ mnesia:read({rabbit_queue, Queue})} of
{[X], [Q]} -> Fun(X, Q);
{[ ], [_]} -> {error, exchange_not_found};
{[_], [ ]} -> {error, queue_not_found};
@@ -382,13 +408,15 @@ sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) ->
Binding = #binding{exchange_name = ExchangeName,
queue_name = QueueName,
key = RoutingKey,
- args = Arguments},
+ args = sort_arguments(Arguments)},
ok = case Durable of
- true -> Fun(durable_routes, #route{binding = Binding}, write);
+ true -> Fun(rabbit_durable_route,
+ #route{binding = Binding}, write);
false -> ok
end,
- [ok, ok] = [Fun(element(1, R), R, write) ||
- R <- tuple_to_list(route_with_reverse(Binding))],
+ {Route, ReverseRoute} = route_with_reverse(Binding),
+ ok = Fun(rabbit_route, Route, write),
+ ok = Fun(rabbit_reverse_route, ReverseRoute, write),
ok.
list_bindings(VHostPath) ->
@@ -399,6 +427,7 @@ list_bindings(VHostPath) ->
queue_name = QueueName,
args = Arguments}}
<- mnesia:dirty_match_object(
+ rabbit_route,
#route{binding = #binding{
exchange_name = rabbit_misc:r(VHostPath, exchange),
_ = '_'},
@@ -434,6 +463,67 @@ reverse_binding(#binding{exchange_name = Exchange,
key = Key,
args = Args}.
+default_headers_match_kind() -> all.
+
+parse_x_match(<<"all">>) -> all;
+parse_x_match(<<"any">>) -> any;
+parse_x_match(Other) ->
+ rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
+ [Other]),
+ default_headers_match_kind().
+
+%% Horrendous matching algorithm. Depends for its merge-like
+%% (linear-time) behaviour on the lists:keysort (sort_arguments) that
+%% route/3 and sync_binding/6 do.
+%%
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%%
+headers_match(Pattern, Data) ->
+ MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
+ {value, {_, longstr, MK}} -> parse_x_match(MK);
+ {value, {_, Type, MK}} ->
+ rabbit_log:warning("Invalid x-match field type ~p "
+ "(value ~p); expected longstr",
+ [Type, MK]),
+ default_headers_match_kind();
+ _ -> default_headers_match_kind()
+ end,
+ headers_match(Pattern, Data, true, false, MatchKind).
+
+headers_match([], _Data, AllMatch, _AnyMatch, all) ->
+ AllMatch;
+headers_match([], _Data, _AllMatch, AnyMatch, any) ->
+ AnyMatch;
+headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data,
+ AllMatch, AnyMatch, MatchKind) ->
+ headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind);
+headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) ->
+ headers_match([], [], false, AnyMatch, MatchKind);
+headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK > DK ->
+ headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind);
+headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _],
+ _AllMatch, AnyMatch, MatchKind) when PK < DK ->
+ headers_match(PRest, Data, false, AnyMatch, MatchKind);
+headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK == DK ->
+ {AllMatch1, AnyMatch1} =
+ if
+ %% It's not properly specified, but a "no value" in a
+ %% pattern field is supposed to mean simple presence of
+ %% the corresponding data field. I've interpreted that to
+ %% mean a type of "void" for the pattern field.
+ PT == void -> {AllMatch, true};
+ %% Similarly, it's not specified, but I assume that a
+ %% mismatched type causes a mismatched value.
+ PT =/= DT -> {false, AnyMatch};
+ PV == DV -> {AllMatch, true};
+ true -> {false, AnyMatch}
+ end,
+ headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
+
split_topic_key(Key) ->
{ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
KeySplit.
@@ -477,15 +567,15 @@ conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
%% we need to check for durable routes here too in case a bunch of
%% routes to durable queues have been removed temporarily as a
%% result of a node failure
- case contains(route, Match) orelse contains(durable_routes, Match) of
+ case contains(rabbit_route, Match) orelse contains(rabbit_durable_route, Match) of
false -> unconditional_delete(Exchange);
true -> {error, in_use}
end.
unconditional_delete(#exchange{name = ExchangeName}) ->
ok = delete_exchange_bindings(ExchangeName),
- ok = mnesia:delete({durable_exchanges, ExchangeName}),
- ok = mnesia:delete({exchange, ExchangeName}).
+ ok = mnesia:delete({rabbit_durable_exchange, ExchangeName}),
+ ok = mnesia:delete({rabbit_exchange, ExchangeName}).
%%----------------------------------------------------------------------------
%% EXTENDED API
@@ -501,7 +591,7 @@ list_exchange_bindings(ExchangeName) ->
#route{binding = #binding{queue_name = QueueName,
key = RoutingKey,
args = Arguments}}
- <- mnesia:dirty_match_object(Route)].
+ <- mnesia:dirty_match_object(rabbit_route, Route)].
% Refactoring is left as an exercise for the reader
list_queue_bindings(QueueName) ->
@@ -511,4 +601,4 @@ list_queue_bindings(QueueName) ->
#route{binding = #binding{exchange_name = ExchangeName,
key = RoutingKey,
args = Arguments}}
- <- mnesia:dirty_match_object(Route)].
+ <- mnesia:dirty_match_object(rabbit_route, Route)].
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index 060bed48..5c447792 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -95,13 +95,15 @@ collect_content(ChannelPid, MethodName) ->
true ->
rabbit_misc:protocol_error(
command_invalid,
- "expected content header for class ~w, got one for class ~w instead",
+ "expected content header for class ~w, "
+ "got one for class ~w instead",
[ClassId, HeaderClassId])
end;
_ ->
rabbit_misc:protocol_error(
command_invalid,
- "expected content header for class ~w, got non content header frame instead",
+ "expected content header for class ~w, "
+ "got non content header frame instead",
[ClassId])
end.
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 51c1665b..2be00503 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -82,7 +82,8 @@ guid() ->
%% and time. We combine that with a process-local counter to give
%% us a GUID that is monotonically increasing per process.
G = case get(guid) of
- undefined -> {{gen_server:call(?SERVER, serial), self()}, 0};
+ undefined -> {{gen_server:call(?SERVER, serial, infinity), self()},
+ 0};
{S, I} -> {S, I+1}
end,
put(guid, G),
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
new file mode 100644
index 00000000..3f9b6ebb
--- /dev/null
+++ b/src/rabbit_limiter.erl
@@ -0,0 +1,195 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_limiter).
+
+-behaviour(gen_server).
+
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2]).
+-export([start_link/1, shutdown/1]).
+-export([limit/2, can_send/2, ack/2, register/2, unregister/2]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(maybe_pid() :: pid() | 'undefined').
+
+-spec(start_link/1 :: (pid()) -> pid()).
+-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
+-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
+-spec(can_send/2 :: (maybe_pid(), pid()) -> bool()).
+-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
+-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
+-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-record(lim, {prefetch_count = 0,
+ ch_pid,
+ queues = dict:new(), % QPid -> {MonitorRef, Notify}
+ volume = 0}).
+%% 'Notify' is a boolean that indicates whether a queue should be
+%% notified of a change in the limit or volume that may allow it to
+%% deliver more messages via the limiter's channel.
+
+%%----------------------------------------------------------------------------
+%% API
+%%----------------------------------------------------------------------------
+
+start_link(ChPid) ->
+ {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []),
+ Pid.
+
+shutdown(undefined) ->
+ ok;
+shutdown(LimiterPid) ->
+ unlink(LimiterPid),
+ gen_server2:cast(LimiterPid, shutdown).
+
+limit(undefined, 0) ->
+ ok;
+limit(LimiterPid, PrefetchCount) ->
+ gen_server2:cast(LimiterPid, {limit, PrefetchCount}).
+
+%% Ask the limiter whether the queue can deliver a message without
+%% breaching a limit
+can_send(undefined, _QPid) ->
+ true;
+can_send(LimiterPid, QPid) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> true end,
+ fun () -> gen_server2:call(LimiterPid, {can_send, QPid}, infinity) end).
+
+%% Let the limiter know that the channel has received some acks from a
+%% consumer
+ack(undefined, _Count) -> ok;
+ack(LimiterPid, Count) -> gen_server2:cast(LimiterPid, {ack, Count}).
+
+register(undefined, _QPid) -> ok;
+register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}).
+
+unregister(undefined, _QPid) -> ok;
+unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}).
+
+%%----------------------------------------------------------------------------
+%% gen_server callbacks
+%%----------------------------------------------------------------------------
+
+init([ChPid]) ->
+ {ok, #lim{ch_pid = ChPid} }.
+
+handle_call({can_send, QPid}, _From, State = #lim{volume = Volume}) ->
+ case limit_reached(State) of
+ true -> {reply, false, limit_queue(QPid, State)};
+ false -> {reply, true, State#lim{volume = Volume + 1}}
+ end.
+
+handle_cast(shutdown, State) ->
+ {stop, normal, State};
+
+handle_cast({limit, PrefetchCount}, State) ->
+ {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})};
+
+handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
+ NewVolume = if Volume == 0 -> 0;
+ true -> Volume - Count
+ end,
+ {noreply, maybe_notify(State, State#lim{volume = NewVolume})};
+
+handle_cast({register, QPid}, State) ->
+ {noreply, remember_queue(QPid, State)};
+
+handle_cast({unregister, QPid}, State) ->
+ {noreply, forget_queue(QPid, State)}.
+
+handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info}, State) ->
+ {noreply, forget_queue(QPid, State)}.
+
+terminate(_, _) ->
+ ok.
+
+code_change(_, State, _) ->
+ State.
+
+%%----------------------------------------------------------------------------
+%% Internal plumbing
+%%----------------------------------------------------------------------------
+
+maybe_notify(OldState, NewState) ->
+ case limit_reached(OldState) andalso not(limit_reached(NewState)) of
+ true -> notify_queues(NewState);
+ false -> NewState
+ end.
+
+limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
+ Limit =/= 0 andalso Volume >= Limit.
+
+remember_queue(QPid, State = #lim{queues = Queues}) ->
+ case dict:is_key(QPid, Queues) of
+ false -> MRef = erlang:monitor(process, QPid),
+ State#lim{queues = dict:store(QPid, {MRef, false}, Queues)};
+ true -> State
+ end.
+
+forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) ->
+ case dict:find(QPid, Queues) of
+ {ok, {MRef, _}} ->
+ true = erlang:demonitor(MRef),
+ ok = rabbit_amqqueue:unblock(QPid, ChPid),
+ State#lim{queues = dict:erase(QPid, Queues)};
+ error -> State
+ end.
+
+limit_queue(QPid, State = #lim{queues = Queues}) ->
+ UpdateFun = fun ({MRef, _}) -> {MRef, true} end,
+ State#lim{queues = dict:update(QPid, UpdateFun, Queues)}.
+
+notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
+ {QList, NewQueues} =
+ dict:fold(fun (_QPid, {_, false}, Acc) -> Acc;
+ (QPid, {MRef, true}, {L, D}) ->
+ {[QPid | L], dict:store(QPid, {MRef, false}, D)}
+ end, {[], Queues}, Queues),
+ case length(QList) of
+ 0 -> ok;
+ L ->
+ %% We randomly vary the position of queues in the list,
+ %% thus ensuring that each queue has an equal chance of
+ %% being notified first.
+ {L1, L2} = lists:split(random:uniform(L), QList),
+ [ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L2 ++ L1],
+ ok
+ end,
+ State#lim{queues = NewQueues}.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 1fcd9a61..eced0b3c 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -50,6 +50,7 @@
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
+-export([start_applications/1, stop_applications/1]).
-import(mnesia).
-import(lists).
@@ -106,6 +107,8 @@
-spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}).
-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
+-spec(start_applications/1 :: ([atom()]) -> 'ok').
+-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-endif.
@@ -232,7 +235,7 @@ filter_exit_map(F, L) ->
with_user(Username, Thunk) ->
fun () ->
- case mnesia:read({user, Username}) of
+ case mnesia:read({rabbit_user, Username}) of
[] ->
mnesia:abort({no_such_user, Username});
[_U] ->
@@ -242,7 +245,7 @@ with_user(Username, Thunk) ->
with_vhost(VHostPath, Thunk) ->
fun () ->
- case mnesia:read({vhost, VHostPath}) of
+ case mnesia:read({rabbit_vhost, VHostPath}) of
[] ->
mnesia:abort({no_such_vhost, VHostPath});
[_V] ->
@@ -385,3 +388,32 @@ format_stderr(Fmt, Args) ->
io:format(Fmt, Args)
end,
ok.
+
+manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
+ Iterate(fun (App, Acc) ->
+ case Do(App) of
+ ok -> [App | Acc];
+ {error, {SkipError, _}} -> Acc;
+ {error, Reason} ->
+ lists:foreach(Undo, Acc),
+ throw({error, {ErrorTag, App, Reason}})
+ end
+ end, [], Apps),
+ ok.
+
+start_applications(Apps) ->
+ manage_applications(fun lists:foldl/3,
+ fun application:start/1,
+ fun application:stop/1,
+ already_started,
+ cannot_start_application,
+ Apps).
+
+stop_applications(Apps) ->
+ manage_applications(fun lists:foldr/3,
+ fun application:stop/1,
+ fun application:start/1,
+ not_started,
+ cannot_stop_application,
+ Apps).
+
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index d19c37cb..15213861 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -100,33 +100,50 @@ force_reset() -> reset(true).
%%--------------------------------------------------------------------
table_definitions() ->
- [{user, [{disc_copies, [node()]},
- {attributes, record_info(fields, user)}]},
- {user_vhost, [{type, bag},
- {disc_copies, [node()]},
- {attributes, record_info(fields, user_vhost)},
- {index, [virtual_host]}]},
- {vhost, [{disc_copies, [node()]},
- {attributes, record_info(fields, vhost)}]},
- {rabbit_config, [{disc_copies, [node()]}]},
- {listener, [{type, bag},
- {attributes, record_info(fields, listener)}]},
- {durable_routes, [{disc_copies, [node()]},
- {record_name, route},
- {attributes, record_info(fields, route)}]},
- {route, [{type, ordered_set},
- {attributes, record_info(fields, route)}]},
- {reverse_route, [{type, ordered_set},
- {attributes, record_info(fields, reverse_route)}]},
- {durable_exchanges, [{disc_copies, [node()]},
- {record_name, exchange},
- {attributes, record_info(fields, exchange)}]},
- {exchange, [{attributes, record_info(fields, exchange)}]},
- {durable_queues, [{disc_copies, [node()]},
- {record_name, amqqueue},
- {attributes, record_info(fields, amqqueue)}]},
- {amqqueue, [{attributes, record_info(fields, amqqueue)},
- {index, [pid]}]}].
+ [{rabbit_user,
+ [{record_name, user},
+ {attributes, record_info(fields, user)},
+ {disc_copies, [node()]}]},
+ {rabbit_user_permission,
+ [{record_name, user_permission},
+ {attributes, record_info(fields, user_permission)},
+ {disc_copies, [node()]}]},
+ {rabbit_vhost,
+ [{record_name, vhost},
+ {attributes, record_info(fields, vhost)},
+ {disc_copies, [node()]}]},
+ {rabbit_config,
+ [{disc_copies, [node()]}]},
+ {rabbit_listener,
+ [{record_name, listener},
+ {attributes, record_info(fields, listener)},
+ {type, bag}]},
+ {rabbit_durable_route,
+ [{record_name, route},
+ {attributes, record_info(fields, route)},
+ {disc_copies, [node()]}]},
+ {rabbit_route,
+ [{record_name, route},
+ {attributes, record_info(fields, route)},
+ {type, ordered_set}]},
+ {rabbit_reverse_route,
+ [{record_name, reverse_route},
+ {attributes, record_info(fields, reverse_route)},
+ {type, ordered_set}]},
+ {rabbit_durable_exchange,
+ [{record_name, exchange},
+ {attributes, record_info(fields, exchange)},
+ {disc_copies, [node()]}]},
+ {rabbit_exchange,
+ [{record_name, exchange},
+ {attributes, record_info(fields, exchange)}]},
+ {rabbit_durable_queue,
+ [{record_name, amqqueue},
+ {attributes, record_info(fields, amqqueue)},
+ {disc_copies, [node()]}]},
+ {rabbit_queue,
+ [{record_name, amqqueue},
+ {attributes, record_info(fields, amqqueue)}]}].
table_names() ->
[Tab || {Tab, _} <- table_definitions()].
@@ -243,8 +260,8 @@ init_db(ClusterNodes) ->
%% NB: we cannot use rabbit_log here since
%% it may not have been started yet
error_logger:warning_msg(
- "schema integrity check failed: ~p~n" ++
- "moving database to backup location " ++
+ "schema integrity check failed: ~p~n"
+ "moving database to backup location "
"and recreating schema from scratch~n",
[Reason]),
ok = move_db(),
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 99ea37d8..2dbd5a5a 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -123,6 +123,7 @@ stop_tcp_listener(Host, Port) ->
tcp_listener_started(IPAddress, Port) ->
ok = mnesia:dirty_write(
+ rabbit_listener,
#listener{node = node(),
protocol = tcp,
host = tcp_host(IPAddress),
@@ -130,19 +131,20 @@ tcp_listener_started(IPAddress, Port) ->
tcp_listener_stopped(IPAddress, Port) ->
ok = mnesia:dirty_delete_object(
+ rabbit_listener,
#listener{node = node(),
protocol = tcp,
host = tcp_host(IPAddress),
port = Port}).
active_listeners() ->
- rabbit_misc:dirty_read_all(listener).
+ rabbit_misc:dirty_read_all(rabbit_listener).
node_listeners(Node) ->
- mnesia:dirty_read(listener, Node).
+ mnesia:dirty_read(rabbit_listener, Node).
on_node_down(Node) ->
- ok = mnesia:dirty_delete(listener, Node).
+ ok = mnesia:dirty_delete(rabbit_listener, Node).
start_client(Sock) ->
{ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 94033a4f..f4fa4599 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -49,6 +49,8 @@
-define(LOG_BUNDLE_DELAY, 5).
-define(COMPLETE_BUNDLE_DELAY, 2).
+-define(HIBERNATE_AFTER, 10000).
+
-define(MAX_WRAP_ENTRIES, 500).
-define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}).
@@ -93,7 +95,7 @@ start_link() ->
transaction(MessageList) ->
?LOGDEBUG("transaction ~p~n", [MessageList]),
TxnKey = rabbit_guid:guid(),
- gen_server:call(?SERVER, {transaction, TxnKey, MessageList}).
+ gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity).
extend_transaction(TxnKey, MessageList) ->
?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]),
@@ -105,17 +107,17 @@ dirty_work(MessageList) ->
commit_transaction(TxnKey) ->
?LOGDEBUG("commit_transaction ~p~n", [TxnKey]),
- gen_server:call(?SERVER, {commit_transaction, TxnKey}).
+ gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity).
rollback_transaction(TxnKey) ->
?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]),
gen_server:cast(?SERVER, {rollback_transaction, TxnKey}).
force_snapshot() ->
- gen_server:call(?SERVER, force_snapshot).
+ gen_server:call(?SERVER, force_snapshot, infinity).
serial() ->
- gen_server:call(?SERVER, serial).
+ gen_server:call(?SERVER, serial, infinity).
%%--------------------------------------------------------------------
@@ -164,10 +166,8 @@ handle_call({transaction, Key, MessageList}, From, State) ->
do_noreply(internal_commit(From, Key, NewState));
handle_call({commit_transaction, TxnKey}, From, State) ->
do_noreply(internal_commit(From, TxnKey, State));
-handle_call(force_snapshot, _From, State = #pstate{log_handle = LH,
- snapshot = Snapshot}) ->
- ok = take_snapshot(LH, Snapshot),
- do_reply(ok, State);
+handle_call(force_snapshot, _From, State) ->
+ do_reply(ok, flush(true, State));
handle_call(serial, _From,
State = #pstate{snapshot = #psnapshot{serial = Serial}}) ->
do_reply(Serial, State);
@@ -183,8 +183,13 @@ handle_cast({extend_transaction, TxnKey, MessageList}, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
+handle_info(timeout, State = #pstate{deadline = infinity}) ->
+ State1 = flush(true, State),
+ %% TODO: Once we drop support for R11B-5, we can change this to
+ %% {noreply, State1, hibernate};
+ proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]);
handle_info(timeout, State) ->
- {noreply, flush(State)};
+ do_noreply(flush(State));
handle_info(_Info, State) ->
{noreply, State}.
@@ -275,12 +280,13 @@ take_snapshot_and_save_old(LogHandle, Snapshot) ->
rabbit_log:info("Saving persister log in ~p~n", [OldFileName]),
ok = take_snapshot(LogHandle, OldFileName, Snapshot).
-maybe_take_snapshot(State = #pstate{entry_count = EntryCount, log_handle = LH,
- snapshot = Snapshot})
- when EntryCount >= ?MAX_WRAP_ENTRIES ->
+maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount,
+ log_handle = LH,
+ snapshot = Snapshot})
+ when Force orelse EntryCount >= ?MAX_WRAP_ENTRIES ->
ok = take_snapshot(LH, Snapshot),
State#pstate{entry_count = 0};
-maybe_take_snapshot(State) ->
+maybe_take_snapshot(_Force, State) ->
State.
later_ms(DeltaMilliSec) ->
@@ -298,7 +304,7 @@ compute_deadline(_TimerDelay, ExistingDeadline) ->
ExistingDeadline.
compute_timeout(infinity) ->
- infinity;
+ ?HIBERNATE_AFTER;
compute_timeout(Deadline) ->
DeltaMilliSec = time_diff(Deadline, now()) * 1000.0,
if
@@ -314,18 +320,18 @@ do_noreply(State = #pstate{deadline = Deadline}) ->
do_reply(Reply, State = #pstate{deadline = Deadline}) ->
{reply, Reply, State, compute_timeout(Deadline)}.
-flush(State = #pstate{pending_logs = PendingLogs,
- pending_replies = Waiting,
- log_handle = LogHandle}) ->
- State1 = if
- PendingLogs /= [] ->
+flush(State) -> flush(false, State).
+
+flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs,
+ pending_replies = Waiting,
+ log_handle = LogHandle}) ->
+ State1 = if PendingLogs /= [] ->
disk_log:alog(LogHandle, lists:reverse(PendingLogs)),
- maybe_take_snapshot(
- State#pstate{
- entry_count = State#pstate.entry_count + 1});
- true ->
+ State#pstate{entry_count = State#pstate.entry_count + 1};
+ true ->
State
end,
+ State2 = maybe_take_snapshot(ForceSnapshot, State1),
if Waiting /= [] ->
ok = disk_log:sync(LogHandle),
lists:foreach(fun (From) -> gen_server:reply(From, ok) end,
@@ -333,7 +339,7 @@ flush(State = #pstate{pending_logs = PendingLogs,
true ->
ok
end,
- State1#pstate{deadline = infinity,
+ State2#pstate{deadline = infinity,
pending_logs = [],
pending_replies = []}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 3f8d7cac..ef8038e7 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -161,10 +161,10 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
info(Pid) ->
- gen_server:call(Pid, info).
+ gen_server:call(Pid, info, infinity).
info(Pid, Items) ->
- case gen_server:call(Pid, {info, Items}) of
+ case gen_server:call(Pid, {info, Items}, infinity) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -173,7 +173,8 @@ setup_profiling() ->
Value = rabbit_misc:get_config(profiling_enabled, false),
case Value of
once ->
- rabbit_log:info("Enabling profiling for this connection, and disabling for subsequent.~n"),
+ rabbit_log:info("Enabling profiling for this connection, "
+ "and disabling for subsequent.~n"),
rabbit_misc:set_config(profiling_enabled, false),
fprof:trace(start);
true ->
@@ -230,8 +231,12 @@ start_connection(Parent, Deb, ClientSock) ->
connection_state = pre_init},
handshake, 8))
catch
- Ex -> rabbit_log:error("error on TCP connection ~p from ~s:~p~n~p~n",
- [self(), PeerAddressS, PeerPort, Ex])
+ Ex -> (if Ex == connection_closed_abruptly ->
+ fun rabbit_log:warning/2;
+ true ->
+ fun rabbit_log:error/2
+ end)("exception on TCP connection ~p from ~s:~p~n~p~n",
+ [self(), PeerAddressS, PeerPort, Ex])
after
rabbit_log:info("closing TCP connection ~p from ~s:~p~n",
[self(), PeerAddressS, PeerPort]),
@@ -283,6 +288,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
exit(Reason);
{'EXIT', _Pid, E = {writer, send_failed, _Error}} ->
throw(E);
+ {channel_exit, Channel, Reason} ->
+ mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State));
{'EXIT', Pid, Reason} ->
mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State));
{terminate_channel, Channel, Ref1} ->
@@ -350,6 +357,14 @@ terminate_channel(Channel, Ref, State) ->
end,
State.
+handle_channel_exit(Channel, Reason, State) ->
+ %% We remove the channel from the inbound map only. That allows
+ %% the channel to be re-opened, but also means the remaining
+ %% cleanup, including possibly closing the connection, is deferred
+ %% until we get the (normal) exit signal.
+ erase({channel, Channel}),
+ handle_exception(State, Channel, Reason).
+
handle_dependent_exit(Pid, normal, State) ->
channel_cleanup(Pid),
maybe_close(State);
@@ -404,7 +419,8 @@ wait_for_channel_termination(N, TimerRef) ->
normal -> ok;
_ ->
rabbit_log:error(
- "connection ~p, channel ~p - error while terminating:~n~p~n",
+ "connection ~p, channel ~p - "
+ "error while terminating:~n~p~n",
[self(), Channel, Reason])
end,
wait_for_channel_termination(N-1, TimerRef)
@@ -709,8 +725,8 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
vhost = VHost}} = State,
WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/4,
- [self(), WriterPid, Username, VHost]),
+ fun rabbit_channel:start_link/5,
+ [Channel, self(), WriterPid, Username, VHost]),
put({channel, Channel}, {chpid, ChPid}),
put({chpid, ChPid}, {channel, Channel}),
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame);
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index ad653a2f..0b06a063 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -32,7 +32,7 @@
-module(rabbit_router).
-include("rabbit.hrl").
--behaviour(gen_server).
+-behaviour(gen_server2).
-export([start_link/0,
deliver/5]).
@@ -58,7 +58,7 @@
%%----------------------------------------------------------------------------
start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
-ifdef(BUG19758).
@@ -100,7 +100,7 @@ deliver_per_node(NodeQPids, Mandatory = false, Immediate = false,
%% than the non-immediate case below.
{ok, lists:flatmap(
fun ({Node, QPids}) ->
- gen_server:cast(
+ gen_server2:cast(
{?SERVER, Node},
{deliver, QPids, Mandatory, Immediate, Txn, Message}),
QPids
@@ -110,9 +110,10 @@ deliver_per_node(NodeQPids, Mandatory, Immediate,
Txn, Message) ->
R = rabbit_misc:upmap(
fun ({Node, QPids}) ->
- try gen_server:call(
+ try gen_server2:call(
{?SERVER, Node},
- {deliver, QPids, Mandatory, Immediate, Txn, Message})
+ {deliver, QPids, Mandatory, Immediate, Txn, Message},
+ infinity)
catch
_Class:_Reason ->
%% TODO: figure out what to log (and do!) here
@@ -143,7 +144,7 @@ handle_call({deliver, QPids, Mandatory, Immediate, Txn, Message},
spawn(
fun () ->
R = run_bindings(QPids, Mandatory, Immediate, Txn, Message),
- gen_server:reply(From, R)
+ gen_server2:reply(From, R)
end),
{noreply, State}.
diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl
index 9e4c9c8a..2a365ce1 100644
--- a/src/rabbit_sasl_report_file_h.erl
+++ b/src/rabbit_sasl_report_file_h.erl
@@ -47,7 +47,7 @@ init({{File, Suffix}, []}) ->
case rabbit_misc:append_file(File, Suffix) of
ok -> ok;
{error, Error} ->
- rabbit_log:error("Failed to append contents of " ++
+ rabbit_log:error("Failed to append contents of "
"sasl log file '~s' to '~s':~n~p~n",
[File, [File, Suffix], Error])
end,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 6706ecd1..8f0a3a89 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -45,6 +45,7 @@ test_content_prop_roundtrip(Datum, Binary) ->
Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion
all_tests() ->
+ passed = test_priority_queue(),
passed = test_parsing(),
passed = test_topic_matching(),
passed = test_log_management(),
@@ -55,6 +56,62 @@ all_tests() ->
passed = test_server_status(),
passed.
+test_priority_queue() ->
+
+ false = priority_queue:is_queue(not_a_queue),
+
+ %% empty Q
+ Q = priority_queue:new(),
+ {true, true, 0, [], []} = test_priority_queue(Q),
+
+ %% 1-4 element no-priority Q
+ true = lists:all(fun (X) -> X =:= passed end,
+ lists:map(fun test_simple_n_element_queue/1,
+ lists:seq(1, 4))),
+
+ %% 1-element priority Q
+ Q1 = priority_queue:in(foo, 1, priority_queue:new()),
+ {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1),
+
+ %% 2-element same-priority Q
+ Q2 = priority_queue:in(bar, 1, Q1),
+ {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} =
+ test_priority_queue(Q2),
+
+ %% 2-element different-priority Q
+ Q3 = priority_queue:in(bar, 2, Q1),
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} =
+ test_priority_queue(Q3),
+
+ %% 1-element negative priority Q
+ Q4 = priority_queue:in(foo, -1, priority_queue:new()),
+ {true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4),
+
+ passed.
+
+priority_queue_in_all(Q, L) ->
+ lists:foldl(fun (X, Acc) -> priority_queue:in(X, Acc) end, Q, L).
+
+priority_queue_out_all(Q) ->
+ case priority_queue:out(Q) of
+ {empty, _} -> [];
+ {{value, V}, Q1} -> [V | priority_queue_out_all(Q1)]
+ end.
+
+test_priority_queue(Q) ->
+ {priority_queue:is_queue(Q),
+ priority_queue:is_empty(Q),
+ priority_queue:len(Q),
+ priority_queue:to_list(Q),
+ priority_queue_out_all(Q)}.
+
+test_simple_n_element_queue(N) ->
+ Items = lists:seq(1, N),
+ Q = priority_queue_in_all(priority_queue:new(), Items),
+ ToListRes = [{0, X} || X <- Items],
+ {true, false, N, ToListRes, Items} = test_priority_queue(Q),
+ passed.
+
test_parsing() ->
passed = test_content_properties(),
passed.
@@ -450,17 +507,16 @@ test_user_management() ->
{error, {no_such_vhost, _}} =
control_action(delete_vhost, ["/testhost"]),
{error, {no_such_user, _}} =
- control_action(map_user_vhost, ["foo", "/"]),
+ control_action(set_permissions, ["foo", ".*", ".*", ".*"]),
{error, {no_such_user, _}} =
- control_action(unmap_user_vhost, ["foo", "/"]),
+ control_action(clear_permissions, ["foo"]),
{error, {no_such_user, _}} =
- control_action(list_user_vhosts, ["foo"]),
- {error, {no_such_vhost, _}} =
- control_action(map_user_vhost, ["guest", "/testhost"]),
+ control_action(list_user_permissions, ["foo"]),
{error, {no_such_vhost, _}} =
- control_action(unmap_user_vhost, ["guest", "/testhost"]),
- {error, {no_such_vhost, _}} =
- control_action(list_vhost_users, ["/testhost"]),
+ control_action(list_permissions, ["-p", "/testhost"]),
+ {error, {invalid_regexp, _, _}} =
+ control_action(set_permissions, ["guest", "+foo", ".*", ".*"]),
+
%% user creation
ok = control_action(add_user, ["foo", "bar"]),
{error, {user_already_exists, _}} =
@@ -475,13 +531,16 @@ test_user_management() ->
ok = control_action(list_vhosts, []),
%% user/vhost mapping
- ok = control_action(map_user_vhost, ["foo", "/testhost"]),
- ok = control_action(map_user_vhost, ["foo", "/testhost"]),
- ok = control_action(list_user_vhosts, ["foo"]),
+ ok = control_action(set_permissions, ["-p", "/testhost",
+ "foo", ".*", ".*", ".*"]),
+ ok = control_action(set_permissions, ["-p", "/testhost",
+ "foo", ".*", ".*", ".*"]),
+ ok = control_action(list_permissions, ["-p", "/testhost"]),
+ ok = control_action(list_user_permissions, ["foo"]),
%% user/vhost unmapping
- ok = control_action(unmap_user_vhost, ["foo", "/testhost"]),
- ok = control_action(unmap_user_vhost, ["foo", "/testhost"]),
+ ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]),
+ ok = control_action(clear_permissions, ["-p", "/testhost", "foo"]),
%% vhost deletion
ok = control_action(delete_vhost, ["/testhost"]),
@@ -490,7 +549,8 @@ test_user_management() ->
%% deleting a populated vhost
ok = control_action(add_vhost, ["/testhost"]),
- ok = control_action(map_user_vhost, ["foo", "/testhost"]),
+ ok = control_action(set_permissions, ["-p", "/testhost",
+ "foo", ".*", ".*", ".*"]),
ok = control_action(delete_vhost, ["/testhost"]),
%% user deletion