summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-01-16 18:42:13 +0000
committerMatthias Radestock <matthias@lshift.net>2009-01-16 18:42:13 +0000
commitd5b5f13f410972545832c1acce2df85134ec4146 (patch)
tree2f490e5f7b4bb4a3e166b3eb5951ef7678e1055c
parent00c7b0249193201cf4e7eb209895802595c32684 (diff)
parentd5197fa54d6831a67c3f96e0dab7f7225f0ad98a (diff)
downloadrabbitmq-server-bug20099.tar.gz
merge default into bug20099bug20099
which involved a few conflict resolutions. The most complex of these was dealing with the termination of the limiter; we now unlink from it first so we don't get a spurious EXIT signal.
-rw-r--r--.hgignore1
-rw-r--r--Makefile16
-rw-r--r--ebin/rabbit.app56
-rw-r--r--ebin/rabbit_app.in20
-rw-r--r--generate_app10
-rw-r--r--packaging/RPMS/Fedora/Makefile23
-rw-r--r--packaging/RPMS/Fedora/init.d4
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec44
-rw-r--r--packaging/debs/Debian/debian/init.d1
-rw-r--r--packaging/debs/Debian/debian/postinst4
-rw-r--r--src/rabbit_access_control.erl2
-rw-r--r--src/rabbit_amqqueue.erl27
-rw-r--r--src/rabbit_amqqueue_process.erl177
-rw-r--r--src/rabbit_channel.erl97
-rw-r--r--src/rabbit_control.erl22
-rw-r--r--src/rabbit_exchange.erl135
-rw-r--r--src/rabbit_limiter.erl195
-rw-r--r--src/rabbit_multi.erl2
18 files changed, 603 insertions, 233 deletions
diff --git a/.hgignore b/.hgignore
index 28f9cfd8..35607765 100644
--- a/.hgignore
+++ b/.hgignore
@@ -9,6 +9,7 @@ syntax: regexp
^include/rabbit_framing.hrl$
^src/rabbit_framing.erl$
^rabbit.plt$
+^ebin/rabbit.app$
^packaging/RPMS/Fedora/(BUILD|RPMS|SOURCES|SPECS|SRPMS)$
^packaging/debs/Debian/rabbitmq-server_.*\.(dsc|(diff|tar)\.gz|deb|changes)$
diff --git a/Makefile b/Makefile
index b8fa2cfa..fede89e1 100644
--- a/Makefile
+++ b/Makefile
@@ -7,7 +7,8 @@ SOURCE_DIR=src
EBIN_DIR=ebin
INCLUDE_DIR=include
SOURCES=$(wildcard $(SOURCE_DIR)/*.erl)
-TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES))
+BEAM_TARGETS=$(EBIN_DIR)/rabbit_framing.beam $(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam,$(SOURCES))
+TARGETS=$(EBIN_DIR)/rabbit.app $(BEAM_TARGETS)
WEB_URL=http://stage.rabbitmq.com/
MANPAGES=$(patsubst %.pod, %.gz, $(wildcard docs/*.[0-9].pod))
@@ -39,6 +40,9 @@ ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
#all: $(EBIN_DIR)/rabbit.boot
all: $(TARGETS)
+$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
+ escript generate_app $(EBIN_DIR) < $< > $@
+
$(EBIN_DIR)/gen_server2.beam: $(SOURCE_DIR)/gen_server2.erl
erlc $(ERLC_OPTS) $<
@@ -47,20 +51,20 @@ $(EBIN_DIR)/%.beam: $(SOURCE_DIR)/%.erl $(INCLUDE_DIR)/rabbit_framing.hrl $(INCL
# ERLC_EMULATOR="erl -smp" erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $<
$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH)
- $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) > $@
+ $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_PATH) $@
$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_PATH)
- $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) > $@
+ $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_PATH) $@
$(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script: $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.rel $(TARGETS)
erl -noshell -eval 'systools:make_script("ebin/rabbit", [{path, ["ebin"]}]), halt().'
-dialyze: $(TARGETS)
+dialyze: $(BEAM_TARGETS)
dialyzer -c $?
clean: cleandb
rm -f $(EBIN_DIR)/*.beam
- rm -f $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script
+ rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script
rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc
rm -f docs/*.[0-9].gz
@@ -126,7 +130,7 @@ srcdist: distclean
cp BUILD.in $(TARGET_SRC_DIR)/BUILD
elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \
>> $(TARGET_SRC_DIR)/BUILD
- sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit.app
+ sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in
cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/
cp codegen.py Makefile $(TARGET_SRC_DIR)
diff --git a/ebin/rabbit.app b/ebin/rabbit.app
deleted file mode 100644
index ca5aec6f..00000000
--- a/ebin/rabbit.app
+++ /dev/null
@@ -1,56 +0,0 @@
-{application, rabbit, %% -*- erlang -*-
- [{description, "RabbitMQ"},
- {id, "RabbitMQ"},
- {vsn, "%%VERSION%%"},
- {modules, [rabbit_access_control,
- rabbit_alarm,
- rabbit_amqqueue,
- rabbit_amqqueue_process,
- rabbit_amqqueue_sup,
- rabbit_binary_generator,
- rabbit_binary_parser,
- rabbit_channel,
- rabbit_control,
- rabbit,
- rabbit_error_logger,
- rabbit_error_logger_file_h,
- rabbit_exchange,
- rabbit_framing_channel,
- rabbit_framing,
- rabbit_heartbeat,
- rabbit_load,
- rabbit_log,
- rabbit_memsup_linux,
- rabbit_misc,
- rabbit_mnesia,
- rabbit_multi,
- rabbit_networking,
- rabbit_node_monitor,
- rabbit_persister,
- rabbit_reader,
- rabbit_router,
- rabbit_sasl_report_file_h,
- rabbit_sup,
- rabbit_tests,
- rabbit_tracer,
- rabbit_writer,
- tcp_acceptor,
- tcp_acceptor_sup,
- tcp_client_sup,
- tcp_listener,
- tcp_listener_sup]},
- {registered, [rabbit_amqqueue_sup,
- rabbit_log,
- rabbit_node_monitor,
- rabbit_persister,
- rabbit_router,
- rabbit_sup,
- rabbit_tcp_client_sup]},
- {applications, [kernel, stdlib, sasl, mnesia, os_mon]},
- {mod, {rabbit, []}},
- {env, [{tcp_listeners, [{"0.0.0.0", 5672}]},
- {extra_startup_steps, []},
- {default_user, <<"guest">>},
- {default_pass, <<"guest">>},
- {default_vhost, <<"/">>},
- {memory_alarms, auto}]}]}.
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
new file mode 100644
index 00000000..e2f36c0f
--- /dev/null
+++ b/ebin/rabbit_app.in
@@ -0,0 +1,20 @@
+{application, rabbit, %% -*- erlang -*-
+ [{description, "RabbitMQ"},
+ {id, "RabbitMQ"},
+ {vsn, "%%VERSION%%"},
+ {modules, []},
+ {registered, [rabbit_amqqueue_sup,
+ rabbit_log,
+ rabbit_node_monitor,
+ rabbit_persister,
+ rabbit_router,
+ rabbit_sup,
+ rabbit_tcp_client_sup]},
+ {applications, [kernel, stdlib, sasl, mnesia, os_mon]},
+ {mod, {rabbit, []}},
+ {env, [{tcp_listeners, [{"0.0.0.0", 5672}]},
+ {extra_startup_steps, []},
+ {default_user, <<"guest">>},
+ {default_pass, <<"guest">>},
+ {default_vhost, <<"/">>},
+ {memory_alarms, auto}]}]}.
diff --git a/generate_app b/generate_app
new file mode 100644
index 00000000..62301292
--- /dev/null
+++ b/generate_app
@@ -0,0 +1,10 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+
+main([BeamDir]) ->
+ Modules = [list_to_atom(filename:basename(F, ".beam")) ||
+ F <- filelib:wildcard("*.beam", BeamDir)],
+ {ok, {application, Application, Properties}} = io:read(''),
+ NewProperties = lists:keyreplace(modules, 1, Properties,
+ {modules, Modules}),
+ io:format("~p.", [{application, Application, NewProperties}]).
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index c05f14a7..cf3a93df 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -6,21 +6,38 @@ TOP_DIR=$(shell pwd)
#only checks build-dependencies using rpms, not debs
DEFINES=--define '_topdir $(TOP_DIR)' --define '_tmppath $(TOP_DIR)/tmp' --define 'debian 1'
+ifndef RPM_OS
+RPM_OS=fedora
+endif
+
+ifeq "x$(RPM_OS)" "xsuse"
+REQUIRES=/sbin/chkconfig /sbin/service
+OS_DEFINES=--define '_initrddir /etc/init.d'
+RELEASE_OS=.suse
+else
+REQUIRES=chkconfig initscripts
+OS_DEFINES=--define '_initrddir /etc/rc.d/init.d'
+RELEASE_OS=
+endif
+
rpms: clean server
prepare:
mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp
cp $(TOP_DIR)/$(TARBALL) SOURCES
cp rabbitmq-server.spec SPECS
- sed -i 's/%%VERSION%%/$(VERSION)/' SPECS/rabbitmq-server.spec
+ sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|;s|%%RELEASE_OS%%|$(RELEASE_OS)|' \
+ SPECS/rabbitmq-server.spec
cp init.d SOURCES/rabbitmq-server.init
cp rabbitmqctl_wrapper SOURCES/rabbitmq-server.wrapper
cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate
server: prepare
- rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --target i386
- rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) --define '_arch x86_64' \
+ rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) \
+ --target i386
+ rpmbuild -ba SPECS/rabbitmq-server.spec $(DEFINES) $(OS_DEFINES) \
+ --define '_libdir /usr/lib64' --define '_arch x86_64' \
--define '_defaultdocdir /usr/share/doc' --target x86_64
clean:
diff --git a/packaging/RPMS/Fedora/init.d b/packaging/RPMS/Fedora/init.d
index 27f150f9..a006a5a7 100644
--- a/packaging/RPMS/Fedora/init.d
+++ b/packaging/RPMS/Fedora/init.d
@@ -16,7 +16,6 @@
# Short-Description: Enable AMQP service provided by RabbitMQ broker
### END INIT INFO
-PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin
DAEMON_NAME=rabbitmq-multi
DAEMON=/usr/lib/rabbitmq/bin/$DAEMON_NAME
NAME=rabbitmq-server
@@ -29,9 +28,6 @@ LOCK_FILE=/var/lock/subsys/$NAME
test -x $DAEMON || exit 0
-# source function library
-. /etc/rc.d/init.d/functions
-
# Include rabbitmq defaults if available
if [ -f /etc/default/rabbitmq ] ; then
. /etc/default/rabbitmq
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 13cfb037..241afd71 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -1,6 +1,6 @@
Name: rabbitmq-server
Version: %%VERSION%%
-Release: 1
+Release: 1%%RELEASE_OS%%
License: MPLv1.1
Group: Development/Libraries
Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.gz
@@ -17,24 +17,18 @@ Requires: erlang, logrotate
Packager: Hubert Plociniczak <hubert@lshift.net>
BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-%{_arch}-root
Summary: The RabbitMQ server
-Requires(post): chkconfig
-Requires(pre): chkconfig initscripts
+Requires(post): %%REQUIRES%%
+Requires(pre): %%REQUIRES%%
%description
RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
scalable implementation of an AMQP broker.
-%ifarch x86_64
- %define _defaultlibdir /usr/lib64
-%else
- %define _defaultlibdir /usr/lib
-%endif
-
-%define _erllibdir %{_defaultlibdir}/erlang/lib
-%define _rabbitbindir %{_defaultlibdir}/rabbitmq/bin
+%define _rabbit_erllibdir %{_libdir}/erlang/lib/rabbitmq_server-%{version}
+%define _rabbit_libdir %{_libdir}/rabbitmq
-%define _maindir %{buildroot}%{_erllibdir}/rabbitmq_server-%{version}
+%define _maindir %{buildroot}%{_rabbit_erllibdir}
%pre
if [ $1 -gt 1 ]; then
@@ -53,25 +47,21 @@ make
rm -rf %{buildroot}
make install TARGET_DIR=%{_maindir} \
- SBIN_DIR=%{buildroot}%{_rabbitbindir} \
+ SBIN_DIR=%{buildroot}%{_rabbit_libdir}/bin \
MAN_DIR=%{buildroot}%{_mandir}
mkdir -p %{buildroot}/var/lib/rabbitmq/mnesia
mkdir -p %{buildroot}/var/log/rabbitmq
-mkdir -p %{buildroot}/etc/rc.d/init.d/
+mkdir -p %{buildroot}%{_initrddir}
#Copy all necessary lib files etc.
-install -m 0755 %SOURCE1 %{buildroot}/etc/rc.d/init.d/rabbitmq-server
-chmod 0755 %{buildroot}/etc/rc.d/init.d/rabbitmq-server
-%ifarch x86_64
- sed -i 's/\/usr\/lib\//\/usr\/lib64\//' %{buildroot}/etc/rc.d/init.d/rabbitmq-server
-%endif
+install -m 0755 %SOURCE1 %{buildroot}%{_initrddir}/rabbitmq-server
+chmod 0755 %{buildroot}%{_initrddir}/rabbitmq-server
+sed -i 's|/usr/lib/|%{_libdir}/|' %{buildroot}%{_initrddir}/rabbitmq-server
mkdir -p %{buildroot}%{_sbindir}
install -m 0755 %SOURCE2 %{buildroot}%{_sbindir}/rabbitmqctl
-%ifarch x86_64
- sed -i 's/\/usr\/lib\//\/usr\/lib64\//' %{buildroot}%{_sbindir}/rabbitmqctl
-%endif
+sed -i 's|/usr/lib/|%{_libdir}/|' %{buildroot}%{_sbindir}/rabbitmqctl
mkdir -p %{buildroot}/etc/logrotate.d
install -m 0644 %SOURCE3 %{buildroot}/etc/logrotate.d/rabbitmq-server
@@ -81,8 +71,10 @@ rm %{_maindir}/LICENSE %{_maindir}/LICENSE-MPL-RabbitMQ %{_maindir}/INSTALL
#Build the list of files
rm -f %{_builddir}/filelist.%{name}.rpm
echo '%defattr(-,root,root, -)' >> %{_builddir}/filelist.%{name}.rpm
-(cd %{buildroot}; find . ! -regex '\./etc.*' \
- -type f | sed -e 's/^\.//' >> %{_builddir}/filelist.%{name}.rpm)
+(cd %{buildroot}; \
+ find . -type f ! -regex '\./etc.*' \
+ ! -regex '\.\(%{_rabbit_erllibdir}\|%{_rabbit_libdir}\).*' \
+ | sed -e 's/^\.//' >> %{_builddir}/filelist.%{name}.rpm)
%post
# create rabbitmq group
@@ -116,7 +108,9 @@ fi
%defattr(-,root,root,-)
%dir /var/lib/rabbitmq
%dir /var/log/rabbitmq
-/etc/rc.d/init.d/rabbitmq-server
+%{_rabbit_erllibdir}
+%{_rabbit_libdir}
+%{_initrddir}/rabbitmq-server
%config(noreplace) /etc/logrotate.d/rabbitmq-server
%doc LICENSE LICENSE-MPL-RabbitMQ INSTALL
diff --git a/packaging/debs/Debian/debian/init.d b/packaging/debs/Debian/debian/init.d
index ace474c5..70dd0adf 100644
--- a/packaging/debs/Debian/debian/init.d
+++ b/packaging/debs/Debian/debian/init.d
@@ -9,7 +9,6 @@
# Short-Description: Enable AMQP service provided by RabbitMQ broker
### END INIT INFO
-PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin
DAEMON=/usr/lib/rabbitmq/bin/rabbitmq-multi
NAME=rabbitmq-server
DESC=rabbitmq-server
diff --git a/packaging/debs/Debian/debian/postinst b/packaging/debs/Debian/debian/postinst
index 495b8331..05fb179c 100644
--- a/packaging/debs/Debian/debian/postinst
+++ b/packaging/debs/Debian/debian/postinst
@@ -25,8 +25,8 @@ fi
# create rabbitmq user
if ! getent passwd rabbitmq >/dev/null; then
- adduser --system --ingroup rabbitmq --home /var/lib/rabbitmq --no-create-home rabbitmq
- usermod -c "RabbitMQ messaging server" rabbitmq
+ adduser --system --ingroup rabbitmq --home /var/lib/rabbitmq \
+ --no-create-home --gecos "RabbitMQ messaging server" rabbitmq
fi
chown -R rabbitmq:rabbitmq /var/lib/rabbitmq
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index b73090fc..36270efd 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -186,6 +186,8 @@ add_vhost(VHostPath) ->
[{<<"">>, direct},
{<<"amq.direct">>, direct},
{<<"amq.topic">>, topic},
+ {<<"amq.match">>, headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, headers}, %% per 0-9-1 xml
{<<"amq.fanout">>, fanout}]],
ok;
[_] ->
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2b9abb29..3b0bf12c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -37,9 +37,9 @@
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
-export([claim_queue/2]).
--export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2]).
--export([commit_all/2, rollback_all/2, notify_down_all/2]).
+-export([basic_get/3, basic_consume/8, basic_cancel/4]).
+-export([notify_sent/2, unblock/2]).
+-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-import(mnesia).
@@ -91,15 +91,17 @@
-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(rollback_all/2 :: ([pid()], txn()) -> ok_or_errors()).
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
+-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
--spec(basic_consume/7 ::
- (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) ->
+-spec(basic_consume/8 ::
+ (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) ->
'ok' | {'error', 'queue_owned_by_another_connection' |
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
+-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -259,16 +261,22 @@ notify_down_all(QPids, ChPid) ->
fun (QPid) -> gen_server:call(QPid, {notify_down, ChPid}, Timeout) end,
QPids).
+limit_all(QPids, ChPid, LimiterPid) ->
+ safe_pmap_ok(
+ fun (_) -> ok end,
+ fun (QPid) -> gen_server:cast(QPid, {limit, ChPid, LimiterPid}) end,
+ QPids).
+
claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
gen_server:call(QPid, {claim_queue, ReaderPid}).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
gen_server:call(QPid, {basic_get, ChPid, NoAck}).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid,
+basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
- ConsumerTag, ExclusiveConsume, OkMsg}).
+ gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
+ LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
@@ -276,6 +284,9 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
notify_sent(QPid, ChPid) ->
gen_server:cast(QPid, {notify_sent, ChPid}).
+unblock(QPid, ChPid) ->
+ gen_server:cast(QPid, {unblock, ChPid}).
+
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 709e355e..b4fd3fcc 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--behaviour(gen_server).
+-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
-define(HIBERNATE_AFTER, 1000).
@@ -62,9 +62,10 @@
%% These are held in our process dictionary
-record(cr, {consumers,
ch_pid,
+ limiter_pid,
monitor_ref,
unacked_messages,
- is_overload_protection_active,
+ is_limit_active,
unsent_message_count}).
-define(INFO_KEYS,
@@ -85,7 +86,7 @@
%%----------------------------------------------------------------------------
start_link(Q) ->
- gen_server:start_link(?MODULE, Q, []).
+ gen_server2:start_link(?MODULE, Q, []).
%%----------------------------------------------------------------------------
@@ -131,7 +132,7 @@ ch_record(ChPid) ->
ch_pid = ChPid,
monitor_ref = MonitorRef,
unacked_messages = dict:new(),
- is_overload_protection_active = false,
+ is_limit_active = false,
unsent_message_count = 0},
put(Key, C),
C;
@@ -144,20 +145,16 @@ store_ch_record(C = #cr{ch_pid = ChPid}) ->
all_ch_record() ->
[C || {{ch, _}, C} <- get()].
-update_store_and_maybe_block_ch(
- C = #cr{is_overload_protection_active = Active,
- unsent_message_count = Count}) ->
- {Result, NewActive} =
- if
- not(Active) and (Count > ?UNSENT_MESSAGE_LIMIT) ->
- {block_ch, true};
- Active and (Count == 0) ->
- {unblock_ch, false};
- true ->
- {ok, Active}
- end,
- store_ch_record(C#cr{is_overload_protection_active = NewActive}),
- Result.
+is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
+ Limited orelse Count > ?UNSENT_MESSAGE_LIMIT.
+
+ch_record_state_transition(OldCR, NewCR) ->
+ BlockedOld = is_ch_blocked(OldCR),
+ BlockedNew = is_ch_blocked(NewCR),
+ if BlockedOld andalso not(BlockedNew) -> unblock;
+ BlockedNew andalso not(BlockedOld) -> block;
+ true -> ok
+ end.
deliver_immediately(Message, Delivered,
State = #q{q = #amqqueue{name = QName},
@@ -168,26 +165,37 @@ deliver_immediately(Message, Delivered,
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
RoundRobinTail} ->
- rabbit_channel:deliver(
- ChPid, ConsumerTag, AckRequired,
- {QName, self(), NextId, Delivered, Message}),
- C = #cr{unsent_message_count = Count,
+ C = #cr{limiter_pid = LimiterPid,
+ unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
- NewUAM = case AckRequired of
- true -> dict:store(NextId, Message, UAM);
- false -> UAM
- end,
- NewConsumers =
- case update_store_and_maybe_block_ch(
- C#cr{unsent_message_count = Count + 1,
- unacked_messages = NewUAM}) of
- ok -> queue:in(QEntry, RoundRobinTail);
- block_ch -> block_consumers(ChPid, RoundRobinTail)
- end,
- {offered, AckRequired, State#q{round_robin = NewConsumers,
- next_msg_id = NextId +1}};
+ case not(AckRequired) orelse rabbit_limiter:can_send(
+ LimiterPid, self()) of
+ true ->
+ rabbit_channel:deliver(
+ ChPid, ConsumerTag, AckRequired,
+ {QName, self(), NextId, Delivered, Message}),
+ NewUAM = case AckRequired of
+ true -> dict:store(NextId, Message, UAM);
+ false -> UAM
+ end,
+ NewC = C#cr{unsent_message_count = Count + 1,
+ unacked_messages = NewUAM},
+ store_ch_record(NewC),
+ NewConsumers =
+ case ch_record_state_transition(C, NewC) of
+ ok -> queue:in(QEntry, RoundRobinTail);
+ block -> block_consumers(ChPid, RoundRobinTail)
+ end,
+ {offered, AckRequired, State#q{round_robin = NewConsumers,
+ next_msg_id = NextId + 1}};
+ false ->
+ store_ch_record(C#cr{is_limit_active = true}),
+ NewConsumers = block_consumers(ChPid, RoundRobinTail),
+ deliver_immediately(Message, Delivered,
+ State#q{round_robin = NewConsumers})
+ end;
{empty, _} ->
- not_offered
+ {not_offered, State}
end.
attempt_delivery(none, Message, State) ->
@@ -198,8 +206,8 @@ attempt_delivery(none, Message, State) ->
persist_message(none, qname(State), Message),
persist_delivery(qname(State), Message, false),
{true, State1};
- not_offered ->
- {false, State}
+ {not_offered, State1} ->
+ {false, State1}
end;
attempt_delivery(Txn, Message, State) ->
persist_message(Txn, qname(State), Message),
@@ -237,16 +245,22 @@ block_consumer(ChPid, ConsumerTag, RoundRobin) ->
(CP /= ChPid) or (CT /= ConsumerTag)
end, queue:to_list(RoundRobin))).
-possibly_unblock(C = #cr{consumers = Consumers, ch_pid = ChPid},
- State = #q{round_robin = RoundRobin}) ->
- case update_store_and_maybe_block_ch(C) of
- ok ->
+possibly_unblock(State, ChPid, Update) ->
+ case lookup_ch(ChPid) of
+ not_found ->
State;
- unblock_ch ->
- run_poke_burst(State#q{round_robin =
- unblock_consumers(ChPid, Consumers, RoundRobin)})
+ C ->
+ NewC = Update(C),
+ store_ch_record(NewC),
+ case ch_record_state_transition(C, NewC) of
+ ok -> State;
+ unblock -> NewRR = unblock_consumers(ChPid,
+ NewC#cr.consumers,
+ State#q.round_robin),
+ run_poke_burst(State#q{round_robin = NewRR})
+ end
end.
-
+
check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) ->
{continue, State};
check_auto_delete(State = #q{has_had_consumers = false}) ->
@@ -301,7 +315,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
{stop, normal, NewState}
end
end.
-
+
cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) ->
none;
cancel_holder(_ChPid, _ConsumerTag, Holder) ->
@@ -334,8 +348,8 @@ run_poke_burst(MessageBuffer, State) ->
{offered, false, NewState} ->
persist_auto_ack(qname(State), Message),
run_poke_burst(BufferTail, NewState);
- not_offered ->
- State#q{message_buffer = MessageBuffer}
+ {not_offered, NewState} ->
+ NewState#q{message_buffer = MessageBuffer}
end;
{empty, _} ->
State#q{message_buffer = MessageBuffer}
@@ -488,7 +502,8 @@ i(name, #q{q = #amqqueue{name = Name}}) -> Name;
i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable;
i(auto_delete, #q{q = #amqqueue{auto_delete = AutoDelete}}) -> AutoDelete;
i(arguments, #q{q = #amqqueue{arguments = Arguments}}) -> Arguments;
-i(pid, #q{q = #amqqueue{pid = Pid}}) -> Pid;
+i(pid, _) ->
+ self();
i(messages_ready, #q{message_buffer = MessageBuffer}) ->
queue:len(MessageBuffer);
i(messages_unacknowledged, _) ->
@@ -551,14 +566,14 @@ handle_call({deliver, Txn, Message}, _From, State) ->
handle_call({commit, Txn}, From, State) ->
ok = commit_work(Txn, qname(State)),
%% optimisation: we reply straight away so the sender can continue
- gen_server:reply(From, ok),
+ gen_server2:reply(From, ok),
NewState = process_pending(Txn, State),
erase_tx(Txn),
noreply(NewState);
handle_call({notify_down, ChPid}, From, State) ->
%% optimisation: we reply straight away so the sender can continue
- gen_server:reply(From, ok),
+ gen_server2:reply(From, ok),
handle_ch_down(ChPid, State);
handle_call({basic_get, ChPid, NoAck}, _From,
@@ -585,8 +600,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
reply(empty, State)
end;
-handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
- ExclusiveConsume, OkMsg},
+handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
+ ConsumerTag, ExclusiveConsume, OkMsg},
_From, State = #q{owner = Owner,
exclusive_consumer = ExistingHolder,
round_robin = RoundRobin}) ->
@@ -600,8 +615,13 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
ok ->
C = #cr{consumers = Consumers} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
- C1 = C#cr{consumers = [Consumer | Consumers]},
- store_ch_record(C1),
+ store_ch_record(C#cr{consumers = [Consumer | Consumers],
+ limiter_pid = LimiterPid}),
+ if Consumers == [] ->
+ ok = rabbit_limiter:register(LimiterPid, self());
+ true ->
+ ok
+ end,
State1 = State#q{has_had_consumers = true,
exclusive_consumer =
if
@@ -621,12 +641,16 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C = #cr{consumers = Consumers} ->
+ C = #cr{consumers = Consumers, limiter_pid = LimiterPid} ->
NewConsumers = lists:filter
(fun (#consumer{tag = CT}) -> CT /= ConsumerTag end,
Consumers),
- C1 = C#cr{consumers = NewConsumers},
- store_ch_record(C1),
+ store_ch_record(C#cr{consumers = NewConsumers}),
+ if NewConsumers == [] ->
+ ok = rabbit_limiter:unregister(LimiterPid, self());
+ true ->
+ ok
+ end,
ok = maybe_send_reply(ChPid, OkMsg),
case check_auto_delete(
State#q{exclusive_consumer = cancel_holder(ChPid,
@@ -729,14 +753,33 @@ handle_cast({requeue, MsgIds, ChPid}, State) ->
[{Message, true} || Message <- Messages], State))
end;
+handle_cast({unblock, ChPid}, State) ->
+ noreply(
+ possibly_unblock(State, ChPid,
+ fun (C) -> C#cr{is_limit_active = false} end));
+
handle_cast({notify_sent, ChPid}, State) ->
- case lookup_ch(ChPid) of
- not_found -> noreply(State);
- T = #cr{unsent_message_count =Count} ->
- noreply(possibly_unblock(
- T#cr{unsent_message_count = Count - 1},
- State))
- end.
+ noreply(
+ possibly_unblock(State, ChPid,
+ fun (C = #cr{unsent_message_count = Count}) ->
+ C#cr{unsent_message_count = Count - 1}
+ end));
+
+handle_cast({limit, ChPid, LimiterPid}, State) ->
+ noreply(
+ possibly_unblock(
+ State, ChPid,
+ fun (C = #cr{consumers = Consumers,
+ limiter_pid = OldLimiterPid,
+ is_limit_active = Limited}) ->
+ if Consumers =/= [] andalso OldLimiterPid == undefined ->
+ ok = rabbit_limiter:register(LimiterPid, self());
+ true ->
+ ok
+ end,
+ NewLimited = Limited andalso LimiterPid =/= undefined,
+ C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
+ end)).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
@@ -757,7 +800,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_info(timeout, State) ->
%% TODO: Once we drop support for R11B-5, we can change this to
%% {noreply, State, hibernate};
- proc_lib:hibernate(gen_server, enter_loop, [?MODULE, [], State]);
+ proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]);
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d508aa81..376e39c6 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -40,7 +40,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
--record(ch, {state, reader_pid, writer_pid,
+-record(ch, {state, reader_pid, writer_pid, limiter_pid,
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host,
@@ -96,6 +96,7 @@ init([ReaderPid, WriterPid, Username, VHost]) ->
{ok, #ch{state = starting,
reader_pid = ReaderPid,
writer_pid = WriterPid,
+ limiter_pid = undefined,
transaction_id = none,
tx_participants = sets:new(),
next_tag = 1,
@@ -155,16 +156,20 @@ handle_info(timeout, State) ->
%% {noreply, State, hibernate};
proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]).
-terminate(_Reason, #ch{writer_pid = WriterPid, state = terminating}) ->
- rabbit_writer:shutdown(WriterPid);
+terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid,
+ state = terminating}) ->
+ rabbit_writer:shutdown(WriterPid),
+ rabbit_limiter:shutdown(LimiterPid);
-terminate(Reason, State = #ch{writer_pid = WriterPid}) ->
+terminate(Reason, State = #ch{writer_pid = WriterPid,
+ limiter_pid = LimiterPid}) ->
Res = notify_queues(internal_rollback(State)),
case Reason of
normal -> ok = Res;
_ -> ok
end,
- rabbit_writer:shutdown(WriterPid).
+ rabbit_writer:shutdown(WriterPid),
+ rabbit_limiter:shutdown(LimiterPid).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -277,7 +282,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
content = DecodedContent,
persistent_key = PersistentKey},
- rabbit_exchange:route(Exchange, RoutingKey), State)};
+ rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
@@ -292,7 +297,8 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
Participants = ack(TxnKey, Acked),
{noreply, case TxnKey of
- none -> State#ch{unacked_message_q = Remaining};
+ none -> ok = notify_limiter(State#ch.limiter_pid, Acked),
+ State#ch{unacked_message_q = Remaining};
_ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q,
Acked),
add_tx_participants(
@@ -335,6 +341,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
exclusive = ExclusiveConsume,
nowait = NoWait},
_, State = #ch{ reader_pid = ReaderPid,
+ limiter_pid = LimiterPid,
consumer_mapping = ConsumerMapping }) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
@@ -352,7 +359,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
QueueName,
fun (Q) ->
rabbit_amqqueue:basic_consume(
- Q, NoAck, ReaderPid, self(),
+ Q, NoAck, ReaderPid, self(), LimiterPid,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag}))
@@ -416,9 +423,31 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
end
end;
-handle_method(#'basic.qos'{}, _, State) ->
- %% FIXME: Need to implement QOS
- {reply, #'basic.qos_ok'{}, State};
+handle_method(#'basic.qos'{global = true}, _, _State) ->
+ rabbit_misc:protocol_error(not_implemented, "global=true", []);
+
+handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
+ rabbit_misc:protocol_error(not_implemented,
+ "prefetch_size!=0 (~w)", [Size]);
+
+handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
+ _, State = #ch{ limiter_pid = LimiterPid }) ->
+ NewLimiterPid = case {LimiterPid, PrefetchCount} of
+ {undefined, 0} ->
+ undefined;
+ {undefined, _} ->
+ LPid = rabbit_limiter:start_link(self()),
+ ok = limit_queues(LPid, State),
+ LPid;
+ {_, 0} ->
+ ok = rabbit_limiter:shutdown(LimiterPid),
+ ok = limit_queues(undefined, State),
+ undefined;
+ {_, _} ->
+ LimiterPid
+ end,
+ ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount),
+ {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}};
handle_method(#'basic.recover'{requeue = true},
_, State = #ch{ transaction_id = none,
@@ -761,7 +790,9 @@ internal_commit(State = #ch{transaction_id = TxnKey,
tx_participants = Participants}) ->
case rabbit_amqqueue:commit_all(sets:to_list(Participants),
TxnKey) of
- ok -> new_tx(State);
+ ok -> ok = notify_limiter(State#ch.limiter_pid,
+ State#ch.uncommitted_ack_q),
+ new_tx(State);
{error, Errors} -> rabbit_misc:protocol_error(
internal_error, "commit failed: ~w", [Errors])
end.
@@ -799,18 +830,36 @@ fold_per_queue(F, Acc0, UAQ) ->
Acc0, D).
notify_queues(#ch{consumer_mapping = Consumers}) ->
- rabbit_amqqueue:notify_down_all(
- [QPid || QueueName <-
- sets:to_list(
- dict:fold(fun (_ConsumerTag, QueueName, S) ->
- sets:add_element(QueueName, S)
- end, sets:new(), Consumers)),
- case rabbit_amqqueue:lookup(QueueName) of
- {ok, Q} -> QPid = Q#amqqueue.pid, true;
- %% queue has been deleted in the meantime
- {error, not_found} -> QPid = none, false
- end],
- self()).
+ rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()).
+
+limit_queues(LPid, #ch{consumer_mapping = Consumers}) ->
+ rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid).
+
+consumer_queues(Consumers) ->
+ [QPid || QueueName <-
+ sets:to_list(
+ dict:fold(fun (_ConsumerTag, QueueName, S) ->
+ sets:add_element(QueueName, S)
+ end, sets:new(), Consumers)),
+ case rabbit_amqqueue:lookup(QueueName) of
+ {ok, Q} -> QPid = Q#amqqueue.pid, true;
+ %% queue has been deleted in the meantime
+ {error, not_found} -> QPid = none, false
+ end].
+
+%% tell the limiter about the number of acks that have been received
+%% for messages delivered to subscribed consumers, but not acks for
+%% messages sent in a response to a basic.get (identified by their
+%% 'none' consumer tag)
+notify_limiter(undefined, _Acked) ->
+ ok;
+notify_limiter(LimiterPid, Acked) ->
+ case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
+ ({_, _, _}, Acc) -> Acc + 1
+ end, 0, queue:to_list(Acked)) of
+ 0 -> ok;
+ Count -> rabbit_limiter:ack(LimiterPid, Count)
+ end.
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index ecc285a5..cbc11b40 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -57,7 +57,7 @@ start() ->
true -> ok;
false -> io:format("...done.~n")
end,
- init:stop();
+ halt();
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
error("invalid command '~s'",
[lists:flatten(
@@ -138,7 +138,7 @@ The list_queues, list_exchanges and list_bindings commands accept an optional
virtual host parameter for which to display results. The default value is \"/\".
<QueueInfoItem> must be a member of the list [name, durable, auto_delete,
-arguments, pid, messages_ready, messages_unacknowledged, messages_uncommitted,
+arguments, node, messages_ready, messages_unacknowledged, messages_uncommitted,
messages, acks_uncommitted, consumers, transactions, memory]. The default is
to display name and (number of) messages.
@@ -148,7 +148,7 @@ auto_delete, arguments]. The default is to display name and type.
The output format for \"list_bindings\" is a list of rows containing
exchange name, routing key, queue name and arguments, in that order.
-<ConnectionInfoItem> must be a member of the list [pid, address, port,
+<ConnectionInfoItem> must be a member of the list [node, address, port,
peer_address, peer_port, state, channels, user, vhost, timeout, frame_max,
recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display
user, peer_address and peer_port.
@@ -242,7 +242,8 @@ action(list_vhost_users, Node, Args = [_VHostPath], Inform) ->
action(list_queues, Node, Args, Inform) ->
Inform("Listing queues", []),
{VHostArg, RemainingArgs} = parse_vhost_flag(Args),
- ArgAtoms = default_if_empty(RemainingArgs, [name, messages]),
+ ArgAtoms = list_replace(node, pid,
+ default_if_empty(RemainingArgs, [name, messages])),
display_info_list(rpc_call(Node, rabbit_amqqueue, info_all,
[VHostArg, ArgAtoms]),
ArgAtoms);
@@ -267,7 +268,8 @@ action(list_bindings, Node, Args, Inform) ->
action(list_connections, Node, Args, Inform) ->
Inform("Listing connections", []),
- ArgAtoms = default_if_empty(Args, [user, peer_address, peer_port]),
+ ArgAtoms = list_replace(node, pid,
+ default_if_empty(Args, [user, peer_address, peer_port])),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
ArgAtoms).
@@ -308,9 +310,10 @@ format_info_item(Items, Key) ->
case Info of
{_, #resource{name = Name}} ->
url_encode(Name);
- {Key, IpAddress} when Key =:= address; Key =:= peer_address
- andalso is_tuple(IpAddress) ->
- inet_parse:ntoa(IpAddress);
+ _ when Key =:= address; Key =:= peer_address andalso is_tuple(Value) ->
+ inet_parse:ntoa(Value);
+ _ when is_pid(Value) ->
+ atom_to_list(node(Value));
_ when is_binary(Value) ->
url_encode(Value);
_ ->
@@ -357,3 +360,6 @@ url_encode_char([], Acc) ->
d2h(N) when N<10 -> N+$0;
d2h(N) -> N+$a-10.
+list_replace(Find, Replace, List) ->
+ [case X of Find -> Replace; _ -> X end || X <- List].
+
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 925c335c..960e4945 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -37,11 +37,11 @@
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
list/1, info/1, info/2, info_all/1, info_all/2,
simple_publish/6, simple_publish/3,
- route/2]).
+ route/3]).
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
-export([delete_bindings_for_queue/1]).
--export([check_type/1, assert_type/2, topic_matches/2]).
+-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]).
%% EXTENDED API
-export([list_exchange_bindings/1]).
@@ -77,7 +77,7 @@
(bool(), bool(), exchange_name(), routing_key(), binary(), binary()) ->
publish_res()).
-spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()).
--spec(route/2 :: (exchange(), routing_key()) -> [pid()]).
+-spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]).
-spec(add_binding/4 ::
(exchange_name(), queue_name(), routing_key(), amqp_table()) ->
bind_res() | {'error', 'durability_settings_incompatible'}).
@@ -88,6 +88,7 @@
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
-spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok').
-spec(topic_matches/2 :: (binary(), binary()) -> bool()).
+-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()).
-spec(delete/2 :: (exchange_name(), bool()) ->
'ok' | not_found() | {'error', 'in_use'}).
-spec(list_queue_bindings/1 :: (queue_name()) ->
@@ -145,6 +146,8 @@ check_type(<<"direct">>) ->
direct;
check_type(<<"topic">>) ->
topic;
+check_type(<<"headers">>) ->
+ headers;
check_type(T) ->
rabbit_misc:protocol_error(
command_invalid, "invalid exchange type '~s'", [T]).
@@ -211,54 +214,69 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin,
%% Usable by Erlang code that wants to publish messages.
simple_publish(Mandatory, Immediate,
Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey}) ->
+ routing_key = RoutingKey,
+ content = Content}) ->
case lookup(ExchangeName) of
{ok, Exchange} ->
- QPids = route(Exchange, RoutingKey),
+ QPids = route(Exchange, RoutingKey, Content),
rabbit_router:deliver(QPids, Mandatory, Immediate,
none, Message);
{error, Error} -> {error, Error}
end.
+sort_arguments(Arguments) ->
+ lists:keysort(1, Arguments).
+
%% return the list of qpids to which a message with a given routing
%% key, sent to a particular exchange, should be delivered.
%%
%% The function ensures that a qpid appears in the return list exactly
%% as many times as a message should be delivered to it. With the
%% current exchange types that is at most once.
-%%
+route(X = #exchange{type = topic}, RoutingKey, _Content) ->
+ match_bindings(X, fun (#binding{key = BindingKey}) ->
+ topic_matches(BindingKey, RoutingKey)
+ end);
+
+route(X = #exchange{type = headers}, _RoutingKey, Content) ->
+ Headers = case (Content#content.properties)#'P_basic'.headers of
+ undefined -> [];
+ H -> sort_arguments(H)
+ end,
+ match_bindings(X, fun (#binding{args = Spec}) ->
+ headers_match(Spec, Headers)
+ end);
+
+route(X = #exchange{type = fanout}, _RoutingKey, _Content) ->
+ match_routing_key(X, '_');
+
+route(X = #exchange{type = direct}, RoutingKey, _Content) ->
+ match_routing_key(X, RoutingKey).
+
%% TODO: Maybe this should be handled by a cursor instead.
-route(#exchange{name = Name, type = topic}, RoutingKey) ->
- Query = qlc:q([QName ||
- #route{binding = #binding{
- exchange_name = ExchangeName,
- queue_name = QName,
- key = BindingKey}} <- mnesia:table(route),
- ExchangeName == Name,
- %% TODO: This causes a full scan for each entry
- %% with the same exchange (see bug 19336)
- topic_matches(BindingKey, RoutingKey)]),
+%% TODO: This causes a full scan for each entry with the same exchange
+match_bindings(#exchange{name = Name}, Match) ->
+ Query = qlc:q([QName || #route{binding = Binding = #binding{
+ exchange_name = ExchangeName,
+ queue_name = QName}} <-
+ mnesia:table(route),
+ ExchangeName == Name,
+ Match(Binding)]),
lookup_qpids(
try
mnesia:async_dirty(fun qlc:e/1, [Query])
catch exit:{aborted, {badarg, _}} ->
%% work around OTP-7025, which was fixed in R12B-1, by
%% falling back on a less efficient method
- [QName || #route{binding = #binding{queue_name = QName,
- key = BindingKey}} <-
+ [QName || #route{binding = Binding = #binding{
+ queue_name = QName}} <-
mnesia:dirty_match_object(
#route{binding = #binding{exchange_name = Name,
_ = '_'}}),
- topic_matches(BindingKey, RoutingKey)]
- end);
-
-route(X = #exchange{type = fanout}, _) ->
- route_internal(X, '_');
-
-route(X = #exchange{type = direct}, RoutingKey) ->
- route_internal(X, RoutingKey).
+ Match(Binding)]
+ end).
-route_internal(#exchange{name = Name}, RoutingKey) ->
+match_routing_key(#exchange{name = Name}, RoutingKey) ->
MatchHead = #route{binding = #binding{exchange_name = Name,
queue_name = '$1',
key = RoutingKey,
@@ -377,7 +395,7 @@ sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) ->
Binding = #binding{exchange_name = ExchangeName,
queue_name = QueueName,
key = RoutingKey,
- args = Arguments},
+ args = sort_arguments(Arguments)},
ok = case Durable of
true -> Fun(durable_routes, #route{binding = Binding}, write);
false -> ok
@@ -429,6 +447,67 @@ reverse_binding(#binding{exchange_name = Exchange,
key = Key,
args = Args}.
+default_headers_match_kind() -> all.
+
+parse_x_match(<<"all">>) -> all;
+parse_x_match(<<"any">>) -> any;
+parse_x_match(Other) ->
+ rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
+ [Other]),
+ default_headers_match_kind().
+
+%% Horrendous matching algorithm. Depends for its merge-like
+%% (linear-time) behaviour on the lists:keysort (sort_arguments) that
+%% route/3 and sync_binding/6 do.
+%%
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%%
+headers_match(Pattern, Data) ->
+ MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
+ {value, {_, longstr, MK}} -> parse_x_match(MK);
+ {value, {_, Type, MK}} ->
+ rabbit_log:warning("Invalid x-match field type ~p "
+ "(value ~p); expected longstr",
+ [Type, MK]),
+ default_headers_match_kind();
+ _ -> default_headers_match_kind()
+ end,
+ headers_match(Pattern, Data, true, false, MatchKind).
+
+headers_match([], _Data, AllMatch, _AnyMatch, all) ->
+ AllMatch;
+headers_match([], _Data, _AllMatch, AnyMatch, any) ->
+ AnyMatch;
+headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data,
+ AllMatch, AnyMatch, MatchKind) ->
+ headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind);
+headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) ->
+ headers_match([], [], false, AnyMatch, MatchKind);
+headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK > DK ->
+ headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind);
+headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _],
+ _AllMatch, AnyMatch, MatchKind) when PK < DK ->
+ headers_match(PRest, Data, false, AnyMatch, MatchKind);
+headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK == DK ->
+ {AllMatch1, AnyMatch1} =
+ if
+ %% It's not properly specified, but a "no value" in a
+ %% pattern field is supposed to mean simple presence of
+ %% the corresponding data field. I've interpreted that to
+ %% mean a type of "void" for the pattern field.
+ PT == void -> {AllMatch, true};
+ %% Similarly, it's not specified, but I assume that a
+ %% mismatched type causes a mismatched value.
+ PT =/= DT -> {false, AnyMatch};
+ PV == DV -> {AllMatch, true};
+ true -> {false, AnyMatch}
+ end,
+ headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
+
split_topic_key(Key) ->
{ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
KeySplit.
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
new file mode 100644
index 00000000..20a66ac5
--- /dev/null
+++ b/src/rabbit_limiter.erl
@@ -0,0 +1,195 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_limiter).
+
+-behaviour(gen_server).
+
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2]).
+-export([start_link/1, shutdown/1]).
+-export([limit/2, can_send/2, ack/2, register/2, unregister/2]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(maybe_pid() :: pid() | 'undefined').
+
+-spec(start_link/1 :: (pid()) -> pid()).
+-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
+-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
+-spec(can_send/2 :: (maybe_pid(), pid()) -> bool()).
+-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
+-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
+-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+-record(lim, {prefetch_count = 0,
+ ch_pid,
+ queues = dict:new(), % QPid -> {MonitorRef, Notify}
+ volume = 0}).
+%% 'Notify' is a boolean that indicates whether a queue should be
+%% notified of a change in the limit or volume that may allow it to
+%% deliver more messages via the limiter's channel.
+
+%%----------------------------------------------------------------------------
+%% API
+%%----------------------------------------------------------------------------
+
+start_link(ChPid) ->
+ {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []),
+ Pid.
+
+shutdown(undefined) ->
+ ok;
+shutdown(LimiterPid) ->
+ unlink(LimiterPid),
+ gen_server:cast(LimiterPid, shutdown).
+
+limit(undefined, 0) ->
+ ok;
+limit(LimiterPid, PrefetchCount) ->
+ gen_server:cast(LimiterPid, {limit, PrefetchCount}).
+
+%% Ask the limiter whether the queue can deliver a message without
+%% breaching a limit
+can_send(undefined, _QPid) ->
+ true;
+can_send(LimiterPid, QPid) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> true end,
+ fun () -> gen_server:call(LimiterPid, {can_send, QPid}) end).
+
+%% Let the limiter know that the channel has received some acks from a
+%% consumer
+ack(undefined, _Count) -> ok;
+ack(LimiterPid, Count) -> gen_server:cast(LimiterPid, {ack, Count}).
+
+register(undefined, _QPid) -> ok;
+register(LimiterPid, QPid) -> gen_server:cast(LimiterPid, {register, QPid}).
+
+unregister(undefined, _QPid) -> ok;
+unregister(LimiterPid, QPid) -> gen_server:cast(LimiterPid, {unregister, QPid}).
+
+%%----------------------------------------------------------------------------
+%% gen_server callbacks
+%%----------------------------------------------------------------------------
+
+init([ChPid]) ->
+ {ok, #lim{ch_pid = ChPid} }.
+
+handle_call({can_send, QPid}, _From, State = #lim{volume = Volume}) ->
+ case limit_reached(State) of
+ true -> {reply, false, limit_queue(QPid, State)};
+ false -> {reply, true, State#lim{volume = Volume + 1}}
+ end.
+
+handle_cast(shutdown, State) ->
+ {stop, normal, State};
+
+handle_cast({limit, PrefetchCount}, State) ->
+ {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})};
+
+handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
+ NewVolume = if Volume == 0 -> 0;
+ true -> Volume - Count
+ end,
+ {noreply, maybe_notify(State, State#lim{volume = NewVolume})};
+
+handle_cast({register, QPid}, State) ->
+ {noreply, remember_queue(QPid, State)};
+
+handle_cast({unregister, QPid}, State) ->
+ {noreply, forget_queue(QPid, State)}.
+
+handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info}, State) ->
+ {noreply, forget_queue(QPid, State)}.
+
+terminate(_, _) ->
+ ok.
+
+code_change(_, State, _) ->
+ State.
+
+%%----------------------------------------------------------------------------
+%% Internal plumbing
+%%----------------------------------------------------------------------------
+
+maybe_notify(OldState, NewState) ->
+ case limit_reached(OldState) andalso not(limit_reached(NewState)) of
+ true -> notify_queues(NewState);
+ false -> NewState
+ end.
+
+limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
+ Limit =/= 0 andalso Volume >= Limit.
+
+remember_queue(QPid, State = #lim{queues = Queues}) ->
+ case dict:is_key(QPid, Queues) of
+ false -> MRef = erlang:monitor(process, QPid),
+ State#lim{queues = dict:store(QPid, {MRef, false}, Queues)};
+ true -> State
+ end.
+
+forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) ->
+ case dict:find(QPid, Queues) of
+ {ok, {MRef, _}} ->
+ true = erlang:demonitor(MRef),
+ ok = rabbit_amqqueue:unblock(QPid, ChPid),
+ State#lim{queues = dict:erase(QPid, Queues)};
+ error -> State
+ end.
+
+limit_queue(QPid, State = #lim{queues = Queues}) ->
+ UpdateFun = fun ({MRef, _}) -> {MRef, true} end,
+ State#lim{queues = dict:update(QPid, UpdateFun, Queues)}.
+
+notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
+ {QList, NewQueues} =
+ dict:fold(fun (_QPid, {_, false}, Acc) -> Acc;
+ (QPid, {MRef, true}, {L, D}) ->
+ {[QPid | L], dict:store(QPid, {MRef, false}, D)}
+ end, {[], Queues}, Queues),
+ case length(QList) of
+ 0 -> ok;
+ L ->
+ %% We randomly vary the position of queues in the list,
+ %% thus ensuring that each queue has an equal chance of
+ %% being notified first.
+ {L1, L2} = lists:split(random:uniform(L), QList),
+ [ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L2 ++ L1],
+ ok
+ end,
+ State#lim{queues = NewQueues}.
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index 7f6eaa8e..5e8edd53 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -50,7 +50,7 @@ start() ->
case catch action(Command, Args, RpcTimeout) of
ok ->
io:format("done.~n"),
- init:stop();
+ halt();
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
error("invalid command '~s'",
[lists:flatten(