summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-05-07 08:40:19 +0100
committerMatthias Radestock <matthias@lshift.net>2009-05-07 08:40:19 +0100
commit6a802eace79c664e84f77c2c38aee143f8da4991 (patch)
tree02ee05f1d7c07223333e49d59f5669b8e19d1d02
parentfbd7eaf8e1fd45f322382cfa1d2291809cac4f1a (diff)
parent2f69044318f570317e3ddc3639e04ef26d8f16b8 (diff)
downloadrabbitmq-server-6a802eace79c664e84f77c2c38aee143f8da4991.tar.gz
merge bug20471 into v1_5
-rw-r--r--Makefile7
-rw-r--r--packaging/RPMS/Fedora/init.d1
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec16
-rw-r--r--packaging/common/rabbitmq-script-wrapper2
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rwxr-xr-x[-rw-r--r--]packaging/debs/Debian/debian/copyright27
-rw-r--r--packaging/debs/Debian/debian/init.d1
-rwxr-xr-xscripts/rabbitmq-multi14
-rwxr-xr-xscripts/rabbitmq-multi.bat4
-rwxr-xr-xscripts/rabbitmq-server24
-rwxr-xr-xscripts/rabbitmq-server.bat1
-rwxr-xr-xscripts/rabbitmqctl5
-rwxr-xr-xscripts/rabbitmqctl.bat2
-rw-r--r--src/rabbit_amqqueue.erl64
-rw-r--r--src/rabbit_channel.erl8
-rw-r--r--src/rabbit_control.erl13
-rw-r--r--src/rabbit_exchange.erl83
-rw-r--r--src/rabbit_misc.erl35
-rw-r--r--src/rabbit_multi.erl11
-rw-r--r--src/rabbit_tests.erl6
20 files changed, 224 insertions, 106 deletions
diff --git a/Makefile b/Makefile
index e0f01f4c..64e008c1 100644
--- a/Makefile
+++ b/Makefile
@@ -87,11 +87,10 @@ run-node: all
run-tests: all
echo "rabbit_tests:all_tests()." | $(ERL_CALL)
-start-background-node: stop-node
+start-background-node:
$(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \
RABBITMQ_NODE_ONLY=true \
- RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \
- ./scripts/rabbitmq-server ; sleep 1
+ ./scripts/rabbitmq-server -detached; sleep 1
start-rabbit-on-node: all
echo "rabbit:start()." | $(ERL_CALL)
@@ -123,7 +122,7 @@ srcdist: distclean
cp README.in $(TARGET_SRC_DIR)/README
elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \
>> $(TARGET_SRC_DIR)/README
- sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit.app
+ sed -i.save 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit.app && rm -f $(TARGET_SRC_DIR)/ebin/rabbit.app.save
cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/
cp codegen.py Makefile $(TARGET_SRC_DIR)
diff --git a/packaging/RPMS/Fedora/init.d b/packaging/RPMS/Fedora/init.d
index d624e7c7..a9155f3b 100644
--- a/packaging/RPMS/Fedora/init.d
+++ b/packaging/RPMS/Fedora/init.d
@@ -33,7 +33,6 @@ fi
RETVAL=0
set -e
-cd /
start_rabbitmq () {
set +e
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 3695c690..54c7def5 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -24,6 +24,7 @@ scalable implementation of an AMQP broker.
%define _rabbit_erllibdir %{_libdir}/erlang/lib/rabbitmq_server-%{version}
%define _rabbit_libdir %{_libdir}/rabbitmq
+%define _rabbit_wrapper %{_builddir}/`basename %{S:2}`
%define _maindir %{buildroot}%{_rabbit_erllibdir}
@@ -36,10 +37,10 @@ fi
%prep
%setup -q
-sed -i 's|/usr/lib/|%{_libdir}/|' %{S:1}
-sed -i 's|/usr/lib/|%{_libdir}/|' %{S:2}
%build
+cp %{S:2} %{_rabbit_wrapper}
+sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_wrapper}
make %{?_smp_mflags}
%install
@@ -54,9 +55,9 @@ mkdir -p %{buildroot}%{_localstatedir}/log/rabbitmq
#Copy all necessary lib files etc.
install -p -D -m 0755 %{S:1} %{buildroot}%{_initrddir}/rabbitmq-server
-install -p -D -m 0755 %{S:2} %{buildroot}%{_sbindir}/rabbitmqctl
-install -p -D -m 0755 %{S:2} %{buildroot}%{_sbindir}/rabbitmq-server
-install -p -D -m 0755 %{S:2} %{buildroot}%{_sbindir}/rabbitmq-multi
+install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmqctl
+install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server
+install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi
install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server
@@ -100,8 +101,6 @@ fi
%defattr(-,root,root,-)
%attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq
%attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/log/rabbitmq
-%dir %{_localstatedir}/lib/rabbitmq
-%dir %{_localstatedir}/log/rabbitmq
%dir %{_sysconfdir}/rabbitmq
%{_rabbit_erllibdir}
%{_rabbit_libdir}
@@ -113,6 +112,9 @@ fi
rm -rf %{buildroot}
%changelog
+* Mon Apr 6 2009 Matthias Radestock <matthias@lshift.net> 1.5.4-1
+- Maintenance release for the 1.5.x series
+
* Tue Feb 24 2009 Tony Garnock-Jones <tonyg@lshift.net> 1.5.3-1
- Maintenance release for the 1.5.x series
diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper
index 217d1658..296a77d1 100644
--- a/packaging/common/rabbitmq-script-wrapper
+++ b/packaging/common/rabbitmq-script-wrapper
@@ -9,7 +9,7 @@ for arg in "$@" ; do
CMDLINE="${CMDLINE} \"${arg}\""
done
-cd /
+cd /var/lib/rabbitmq
SCRIPT=`basename $0`
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 3be25f48..d1ccd3a0 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (1.5.4-1) hardy; urgency=low
+
+ * New Upstream Release
+
+ -- Matthias Radestock <matthias@lshift.net> Mon, 06 Apr 2009 09:19:32 +0100
+
rabbitmq-server (1.5.3-1) hardy; urgency=low
* New Upstream Release
diff --git a/packaging/debs/Debian/debian/copyright b/packaging/debs/Debian/debian/copyright
index 854db290..69867220 100644..100755
--- a/packaging/debs/Debian/debian/copyright
+++ b/packaging/debs/Debian/debian/copyright
@@ -3,9 +3,30 @@ Wed, 3 Jan 2007 15:43:44 +0000.
It was downloaded from http://www.rabbitmq.com/
-codegen/amqp-0.8.json is released under the MIT License and is
-Copyright (C) 2008-2009 LShift Ltd, Cohesive Financial Technologies
-LLC, and Rabbit Technologies Ltd.
+The file codegen/amqp-0.8.json is covered by the following terms:
+
+ "Copyright (C) 2008-2009 LShift Ltd, Cohesive Financial Technologies LLC,
+ and Rabbit Technologies Ltd
+
+ Permission is hereby granted, free of charge, to any person
+ obtaining a copy of this file (the Software), to deal in the
+ Software without restriction, including without limitation the
+ rights to use, copy, modify, merge, publish, distribute,
+ sublicense, and/or sell copies of the Software, and to permit
+ persons to whom the Software is furnished to do so, subject to
+ the following conditions:
+
+ The above copyright notice and this permission notice shall be
+ included in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
+ EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ OTHER DEALINGS IN THE SOFTWARE."
The rest of this package is licensed under the Mozilla Public License 1.1
Authors and Copyright are as described below:
diff --git a/packaging/debs/Debian/debian/init.d b/packaging/debs/Debian/debian/init.d
index ef66add5..a35a60ec 100644
--- a/packaging/debs/Debian/debian/init.d
+++ b/packaging/debs/Debian/debian/init.d
@@ -26,7 +26,6 @@ fi
RETVAL=0
set -e
-cd /
start_rabbitmq () {
set +e
diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi
index 4cf0703a..1d0c785f 100755
--- a/scripts/rabbitmq-multi
+++ b/scripts/rabbitmq-multi
@@ -29,23 +29,23 @@
##
## Contributor(s): ______________________________________.
##
+NODENAME=rabbit
+NODE_IP_ADDRESS=0.0.0.0
+NODE_PORT=5672
+SCRIPT_HOME=$(dirname $0)
+PIDS_FILE=/var/lib/rabbitmq/pids
+MULTI_ERL_ARGS=
+MULTI_START_ARGS=
[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
-[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=rabbit
[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
-[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
-[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=5672
[ "x" = "x$RABBITMQ_SCRIPT_HOME" ] && RABBITMQ_SCRIPT_HOME=${SCRIPT_HOME}
-[ "x" = "x$RABBITMQ_SCRIPT_HOME" ] && RABBITMQ_SCRIPT_HOME=$(dirname $0)
[ "x" = "x$RABBITMQ_PIDS_FILE" ] && RABBITMQ_PIDS_FILE=${PIDS_FILE}
-[ "x" = "x$RABBITMQ_PIDS_FILE" ] && RABBITMQ_PIDS_FILE=/var/lib/rabbitmq/pids
[ "x" = "x$RABBITMQ_MULTI_ERL_ARGS" ] && RABBITMQ_MULTI_ERL_ARGS=${MULTI_ERL_ARGS}
-[ "x" = "x$RABBITMQ_MULTI_ERL_ARGS" ] && RABBITMQ_MULTI_ERL_ARGS=
[ "x" = "x$RABBITMQ_MULTI_START_ARGS" ] && RABBITMQ_MULTI_START_ARGS=${MULTI_START_ARGS}
-[ "x" = "x$RABBITMQ_MULTI_START_ARGS" ] && RABBITMQ_MULTI_START_ARGS=
export \
RABBITMQ_NODENAME \
diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat
index 30f33a5a..a30c0889 100755
--- a/scripts/rabbitmq-multi.bat
+++ b/scripts/rabbitmq-multi.bat
@@ -47,7 +47,7 @@ if "%RABBITMQ_NODE_PORT%"=="" (
)
set RABBITMQ_PIDS_FILE=%RABBITMQ_BASE%\rabbitmq.pids
-set RABBITMQ_SCRIPT_HOME=%~dp0%
+set RABBITMQ_SCRIPT_HOME=%~sdp0%
if "%ERLANG_HOME%"=="" (
set ERLANG_HOME=%~dp0%..\..\..
@@ -65,5 +65,5 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" (
exit /B
)
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden -sname rabbitmq_multi -s rabbit_multi %START_ARGS% -extra %*
+"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_MULTI_ERL_ARGS% -sname rabbitmq_multi -s rabbit_multi %RABBITMQ_MULTI_START_ARGS% -extra %*
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 6273804f..8502d60a 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -30,28 +30,30 @@
## Contributor(s): ______________________________________.
##
+NODENAME=rabbit
+NODE_IP_ADDRESS=0.0.0.0
+NODE_PORT=5672
+SERVER_ERL_ARGS="+K true +A30 \
+-kernel inet_default_listen_options [{nodelay,true},{sndbuf,16384},{recbuf,4096}] \
+-kernel inet_default_connect_options [{nodelay,true}]"
+CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config
+LOG_BASE=/var/log/rabbitmq
+MNESIA_BASE=/var/lib/rabbitmq/mnesia
+SERVER_START_ARGS=
+
[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
-[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=rabbit
[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
-[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT}
-[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=5672
[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS}
-[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS="+K true +A30 \
--kernel inet_default_listen_options [{nodelay,true},{sndbuf,16384},{recbuf,4096}] \
--kernel inet_default_connect_options [{nodelay,true}]"
[ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE}
-[ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config
[ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE}
-[ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=/var/log/rabbitmq
[ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE}
-[ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=/var/lib/rabbitmq/mnesia
+[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS}
+
[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR}
[ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME}
-[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS}
-[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=
## Log rotation
[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS}
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 5b20ef20..9915727b 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -117,6 +117,7 @@ if "%RABBITMQ_MNESIA_DIR%"=="" (
-kernel inet_default_connect_options "[{nodelay, true}]" ^
-rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\", %RABBITMQ_NODE_PORT%}]" ^
-kernel error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%.log"\"} ^
+%RABBITMQ_SERVER_ERL_ARGS% ^
-sasl errlog_type error ^
-sasl sasl_error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%-sasl.log"\"} ^
-os_mon start_cpu_sup true ^
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index b941b850..c57978c0 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -30,10 +30,15 @@
## Contributor(s): ______________________________________.
##
+[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
+
+[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS}
+
exec erl \
-pa "`dirname $0`/../ebin" \
-noinput \
-hidden \
+ ${RABBITMQ_CTL_ERL_ARGS} \
-sname rabbitmqctl$$ \
-s rabbit_control \
-extra "$@"
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index 33a10777..e4dccfba 100755
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -46,4 +46,4 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" (
exit /B
)
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden -sname rabbitmqctl -s rabbit_control -extra %*
+"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_CTL_ERL_ARGS% -sname rabbitmqctl -s rabbit_control -extra %*
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2b9abb29..382810c3 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -122,19 +122,32 @@ recover() ->
recover_durable_queues() ->
Node = node(),
- %% TODO: use dirty ops instead
- R = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(durable_queues),
- node(Pid) == Node]))
- end),
- Queues = lists:map(fun start_queue_process/1, R),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- lists:foreach(fun store_queue/1, Queues),
- ok
- end).
+ lists:foreach(
+ fun (RecoveredQ) ->
+ Q = start_queue_process(RecoveredQ),
+ %% We need to catch the case where a client connected to
+ %% another node has deleted the queue (and possibly
+ %% re-created it).
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () -> case mnesia:match_object(
+ durable_queues, RecoveredQ, read) of
+ [_] -> ok = store_queue(Q),
+ true;
+ [] -> false
+ end
+ end) of
+ true -> ok;
+ false -> exit(Q#amqqueue.pid, shutdown)
+ end
+ end,
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
+ <- mnesia:table(durable_queues),
+ node(Pid) == Node]))
+ end)),
+ ok.
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,
@@ -280,28 +293,29 @@ internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({amqqueue, QueueName}) of
- [] -> {error, not_found};
- [Q] ->
- ok = delete_queue(Q),
+ [] -> {error, not_found};
+ [_] ->
+ ok = rabbit_exchange:delete_queue_bindings(QueueName),
+ ok = mnesia:delete({amqqueue, QueueName}),
ok = mnesia:delete({durable_queues, QueueName}),
ok
end
end).
-delete_queue(#amqqueue{name = QueueName}) ->
- ok = rabbit_exchange:delete_bindings_for_queue(QueueName),
- ok = mnesia:delete({amqqueue, QueueName}),
- ok.
-
on_node_down(Node) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
qlc:fold(
- fun (Q, Acc) -> ok = delete_queue(Q), Acc end,
+ fun (QueueName, Acc) ->
+ ok = rabbit_exchange:delete_transient_queue_bindings(
+ QueueName),
+ ok = mnesia:delete({amqqueue, QueueName}),
+ Acc
+ end,
ok,
- qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(amqqueue),
- node(Pid) == Node]))
+ qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid}
+ <- mnesia:table(amqqueue),
+ node(Pid) == Node]))
end).
pseudo_queue(QueueName, Pid) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 4bf2f446..5fd9a512 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -664,12 +664,16 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
+ {error, exchange_not_found} ->
+ rabbit_misc:protocol_error(
+ not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
{error, queue_not_found} ->
rabbit_misc:protocol_error(
not_found, "no ~s", [rabbit_misc:rs(QueueName)]);
- {error, exchange_not_found} ->
+ {error, exchange_and_queue_not_found} ->
rabbit_misc:protocol_error(
- not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]);
+ not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName),
+ rabbit_misc:rs(QueueName)]);
{error, binding_not_found} ->
rabbit_misc:protocol_error(
not_found, "no binding ~s between ~s and ~s",
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index cbc11b40..352d7e75 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -38,6 +38,19 @@
-define(RPC_TIMEOUT, 30000).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start/0 :: () -> no_return()).
+-spec(stop/0 :: () -> 'ok').
+-spec(action/4 :: (atom(), erlang_node(), [string()],
+ fun ((string(), [any()]) -> 'ok')) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start() ->
FullCommand = init:get_plain_arguments(),
#params{quiet = Quiet, node = Node, command = Command, args = Args} =
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 925c335c..7f3a78e9 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -40,7 +40,7 @@
route/2]).
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
--export([delete_bindings_for_queue/1]).
+-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
-export([check_type/1, assert_type/2, topic_matches/2]).
%% EXTENDED API
@@ -59,8 +59,10 @@
-type(publish_res() :: {'ok', [pid()]} |
not_found() | {'error', 'unroutable' | 'not_delivered'}).
--type(bind_res() :: 'ok' |
- {'error', 'queue_not_found' | 'exchange_not_found'}).
+-type(bind_res() :: 'ok' | {'error',
+ 'queue_not_found' |
+ 'exchange_not_found' |
+ 'exchange_and_queue_not_found'}).
-spec(recover/0 :: () -> 'ok').
-spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(),
amqp_table()) -> exchange()).
@@ -86,7 +88,8 @@
bind_res() | {'error', 'binding_not_found'}).
-spec(list_bindings/1 :: (vhost()) ->
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
--spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok').
+-spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok').
+-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok').
-spec(topic_matches/2 :: (binary(), binary()) -> bool()).
-spec(delete/2 :: (exchange_name(), bool()) ->
'ok' | not_found() | {'error', 'in_use'}).
@@ -102,22 +105,15 @@
-define(INFO_KEYS, [name, type, durable, auto_delete, arguments].
recover() ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- mnesia:foldl(
- fun (Exchange, Acc) ->
- ok = mnesia:write(Exchange),
- Acc
- end, ok, durable_exchanges),
- mnesia:foldl(
- fun (Route, Acc) ->
- {_, ReverseRoute} = route_with_reverse(Route),
- ok = mnesia:write(Route),
- ok = mnesia:write(ReverseRoute),
- Acc
- end, ok, durable_routes),
- ok
- end).
+ ok = rabbit_misc:table_foreach(
+ fun(Exchange) -> ok = mnesia:write(Exchange) end,
+ durable_exchanges),
+ ok = rabbit_misc:table_foreach(
+ fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route),
+ ok = mnesia:write(Route),
+ ok = mnesia:write(ReverseRoute)
+ end, durable_routes),
+ ok.
declare(ExchangeName, Type, Durable, AutoDelete, Args) ->
Exchange = #exchange{name = ExchangeName,
@@ -278,18 +274,24 @@ lookup_qpids(Queues) ->
%% refactored to its own module, especially seeing as unbind will have
%% to be implemented for 0.91 ?
-delete_bindings_for_exchange(ExchangeName) ->
+delete_exchange_bindings(ExchangeName) ->
indexed_delete(
#route{binding = #binding{exchange_name = ExchangeName,
_ = '_'}},
fun delete_forward_routes/1, fun mnesia:delete_object/1).
-delete_bindings_for_queue(QueueName) ->
+delete_queue_bindings(QueueName) ->
+ delete_queue_bindings(QueueName, fun delete_forward_routes/1).
+
+delete_transient_queue_bindings(QueueName) ->
+ delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1).
+
+delete_queue_bindings(QueueName, FwdDeleteFun) ->
Exchanges = exchanges_for_queue(QueueName),
indexed_delete(
reverse_route(#route{binding = #binding{queue_name = QueueName,
_ = '_'}}),
- fun mnesia:delete_object/1, fun delete_forward_routes/1),
+ fun mnesia:delete_object/1, FwdDeleteFun),
[begin
[X] = mnesia:read({exchange, ExchangeName}),
ok = maybe_auto_delete(X)
@@ -307,6 +309,9 @@ delete_forward_routes(Route) ->
ok = mnesia:delete_object(Route),
ok = mnesia:delete_object(durable_routes, Route, write).
+delete_transient_forward_routes(Route) ->
+ ok = mnesia:delete_object(Route).
+
exchanges_for_queue(QueueName) ->
MatchHead = reverse_route(
#route{binding = #binding{exchange_name = '$1',
@@ -316,15 +321,13 @@ exchanges_for_queue(QueueName) ->
sets:from_list(
mnesia:select(reverse_route, [{MatchHead, [], ['$1']}]))).
-has_bindings(ExchangeName) ->
- MatchHead = #route{binding = #binding{exchange_name = ExchangeName,
- _ = '_'}},
+contains(Table, MatchHead) ->
try
- continue(mnesia:select(route, [{MatchHead, [], ['$_']}], 1, read))
+ continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read))
catch exit:{aborted, {badarg, _}} ->
%% work around OTP-7025, which was fixed in R12B-1, by
%% falling back on a less efficient method
- case mnesia:match_object(MatchHead) of
+ case mnesia:match_object(Table, MatchHead, read) of
[] -> false;
[_|_] -> true
end
@@ -337,18 +340,20 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
call_with_exchange(Exchange, Fun) ->
rabbit_misc:execute_mnesia_transaction(
fun() -> case mnesia:read({exchange, Exchange}) of
- [] -> {error, exchange_not_found};
+ [] -> {error, not_found};
[X] -> Fun(X)
end
end).
call_with_exchange_and_queue(Exchange, Queue, Fun) ->
- call_with_exchange(
- Exchange,
- fun(X) -> case mnesia:read({amqqueue, Queue}) of
- [] -> {error, queue_not_found};
- [Q] -> Fun(X, Q)
- end
+ rabbit_misc:execute_mnesia_transaction(
+ fun() -> case {mnesia:read({exchange, Exchange}),
+ mnesia:read({amqqueue, Queue})} of
+ {[X], [Q]} -> Fun(X, Q);
+ {[ ], [_]} -> {error, exchange_not_found};
+ {[_], [ ]} -> {error, queue_not_found};
+ {[ ], [ ]} -> {error, exchange_and_queue_not_found}
+ end
end).
add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
@@ -468,13 +473,17 @@ maybe_auto_delete(Exchange = #exchange{auto_delete = true}) ->
ok.
conditional_delete(Exchange = #exchange{name = ExchangeName}) ->
- case has_bindings(ExchangeName) of
+ Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}},
+ %% we need to check for durable routes here too in case a bunch of
+ %% routes to durable queues have been removed temporarily as a
+ %% result of a node failure
+ case contains(route, Match) orelse contains(durable_routes, Match) of
false -> unconditional_delete(Exchange);
true -> {error, in_use}
end.
unconditional_delete(#exchange{name = ExchangeName}) ->
- ok = delete_bindings_for_exchange(ExchangeName),
+ ok = delete_exchange_bindings(ExchangeName),
ok = mnesia:delete({durable_exchanges, ExchangeName}),
ok = mnesia:delete({exchange, ExchangeName}).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 053bde54..1fcd9a61 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -46,6 +46,7 @@
-export([ensure_ok/2]).
-export([localnode/1, tcp_name/3]).
-export([intersperse/2, upmap/2, map_in_order/2]).
+-export([table_foreach/2]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
@@ -97,13 +98,14 @@
-spec(intersperse/2 :: (A, [A]) -> [A]).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
-spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]).
+-spec(table_foreach/2 :: (fun ((any()) -> any()), atom()) -> 'ok').
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
'ok' | 'aborted').
-spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}).
-spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}).
-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
--spec(format_stderr/2 :: (string(), [any()]) -> 'true').
+-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-endif.
@@ -295,6 +297,21 @@ map_in_order(F, L) ->
lists:reverse(
lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)).
+%% For each entry in a table, execute a function in a transaction.
+%% This is often far more efficient than wrapping a tx around the lot.
+%%
+%% We ignore entries that have been modified or removed.
+table_foreach(F, TableName) ->
+ lists:foreach(
+ fun (E) -> execute_mnesia_transaction(
+ fun () -> case mnesia:match_object(TableName, E, read) of
+ [] -> ok;
+ _ -> F(E)
+ end
+ end)
+ end, dirty_read_all(TableName)),
+ ok.
+
dirty_read_all(TableName) ->
mnesia:dirty_select(TableName, [{'$1',[],['$1']}]).
@@ -355,6 +372,16 @@ ensure_parent_dirs_exist(Filename) ->
end.
format_stderr(Fmt, Args) ->
- Port = open_port({fd, 0, 2}, [out]),
- port_command(Port, io_lib:format(Fmt, Args)),
- port_close(Port).
+ case os:type() of
+ {unix, _} ->
+ Port = open_port({fd, 0, 2}, [out]),
+ port_command(Port, io_lib:format(Fmt, Args)),
+ port_close(Port);
+ {win32, _} ->
+ %% stderr on Windows is buffered and I can't figure out a
+ %% way to trigger a fflush(stderr) in Erlang. So rather
+ %% than risk losing output we write to stdout instead,
+ %% which appears to be unbuffered.
+ io:format(Fmt, Args)
+ end,
+ ok.
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index 5e8edd53..d9197535 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -36,6 +36,17 @@
-define(RPC_SLEEP, 500).
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start/0 :: () -> no_return()).
+-spec(stop/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start() ->
RpcTimeout =
case init:get_argument(maxwait) of
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index df2e71d9..6706ecd1 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -430,7 +430,13 @@ test_cluster_management2(SecondaryNode) ->
ok = control_action(stop_app, []),
{error, {no_running_cluster_nodes, _, _}} =
control_action(reset, []),
+
+ %% leave system clustered, with the secondary node as a ram node
ok = control_action(force_reset, []),
+ ok = control_action(start_app, []),
+ ok = control_action(force_reset, SecondaryNode, []),
+ ok = control_action(cluster, SecondaryNode, [NodeS]),
+ ok = control_action(start_app, SecondaryNode, []),
passed.