diff options
author | Matthias Radestock <matthias@lshift.net> | 2009-05-07 08:40:19 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2009-05-07 08:40:19 +0100 |
commit | 6a802eace79c664e84f77c2c38aee143f8da4991 (patch) | |
tree | 02ee05f1d7c07223333e49d59f5669b8e19d1d02 | |
parent | fbd7eaf8e1fd45f322382cfa1d2291809cac4f1a (diff) | |
parent | 2f69044318f570317e3ddc3639e04ef26d8f16b8 (diff) | |
download | rabbitmq-server-6a802eace79c664e84f77c2c38aee143f8da4991.tar.gz |
merge bug20471 into v1_5
-rw-r--r-- | Makefile | 7 | ||||
-rw-r--r-- | packaging/RPMS/Fedora/init.d | 1 | ||||
-rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 16 | ||||
-rw-r--r-- | packaging/common/rabbitmq-script-wrapper | 2 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/changelog | 6 | ||||
-rwxr-xr-x[-rw-r--r--] | packaging/debs/Debian/debian/copyright | 27 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/init.d | 1 | ||||
-rwxr-xr-x | scripts/rabbitmq-multi | 14 | ||||
-rwxr-xr-x | scripts/rabbitmq-multi.bat | 4 | ||||
-rwxr-xr-x | scripts/rabbitmq-server | 24 | ||||
-rwxr-xr-x | scripts/rabbitmq-server.bat | 1 | ||||
-rwxr-xr-x | scripts/rabbitmqctl | 5 | ||||
-rwxr-xr-x | scripts/rabbitmqctl.bat | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 64 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 8 | ||||
-rw-r--r-- | src/rabbit_control.erl | 13 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 83 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 35 | ||||
-rw-r--r-- | src/rabbit_multi.erl | 11 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 6 |
20 files changed, 224 insertions, 106 deletions
@@ -87,11 +87,10 @@ run-node: all run-tests: all echo "rabbit_tests:all_tests()." | $(ERL_CALL) -start-background-node: stop-node +start-background-node: $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ RABBITMQ_NODE_ONLY=true \ - RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \ - ./scripts/rabbitmq-server ; sleep 1 + ./scripts/rabbitmq-server -detached; sleep 1 start-rabbit-on-node: all echo "rabbit:start()." | $(ERL_CALL) @@ -123,7 +122,7 @@ srcdist: distclean cp README.in $(TARGET_SRC_DIR)/README elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \ >> $(TARGET_SRC_DIR)/README - sed -i 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit.app + sed -i.save 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit.app && rm -f $(TARGET_SRC_DIR)/ebin/rabbit.app.save cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ cp codegen.py Makefile $(TARGET_SRC_DIR) diff --git a/packaging/RPMS/Fedora/init.d b/packaging/RPMS/Fedora/init.d index d624e7c7..a9155f3b 100644 --- a/packaging/RPMS/Fedora/init.d +++ b/packaging/RPMS/Fedora/init.d @@ -33,7 +33,6 @@ fi RETVAL=0 set -e -cd / start_rabbitmq () { set +e diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 3695c690..54c7def5 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -24,6 +24,7 @@ scalable implementation of an AMQP broker. %define _rabbit_erllibdir %{_libdir}/erlang/lib/rabbitmq_server-%{version} %define _rabbit_libdir %{_libdir}/rabbitmq +%define _rabbit_wrapper %{_builddir}/`basename %{S:2}` %define _maindir %{buildroot}%{_rabbit_erllibdir} @@ -36,10 +37,10 @@ fi %prep %setup -q -sed -i 's|/usr/lib/|%{_libdir}/|' %{S:1} -sed -i 's|/usr/lib/|%{_libdir}/|' %{S:2} %build +cp %{S:2} %{_rabbit_wrapper} +sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_wrapper} make %{?_smp_mflags} %install @@ -54,9 +55,9 @@ mkdir -p %{buildroot}%{_localstatedir}/log/rabbitmq #Copy all necessary lib files etc. install -p -D -m 0755 %{S:1} %{buildroot}%{_initrddir}/rabbitmq-server -install -p -D -m 0755 %{S:2} %{buildroot}%{_sbindir}/rabbitmqctl -install -p -D -m 0755 %{S:2} %{buildroot}%{_sbindir}/rabbitmq-server -install -p -D -m 0755 %{S:2} %{buildroot}%{_sbindir}/rabbitmq-multi +install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmqctl +install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server +install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server @@ -100,8 +101,6 @@ fi %defattr(-,root,root,-) %attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq %attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/log/rabbitmq -%dir %{_localstatedir}/lib/rabbitmq -%dir %{_localstatedir}/log/rabbitmq %dir %{_sysconfdir}/rabbitmq %{_rabbit_erllibdir} %{_rabbit_libdir} @@ -113,6 +112,9 @@ fi rm -rf %{buildroot} %changelog +* Mon Apr 6 2009 Matthias Radestock <matthias@lshift.net> 1.5.4-1 +- Maintenance release for the 1.5.x series + * Tue Feb 24 2009 Tony Garnock-Jones <tonyg@lshift.net> 1.5.3-1 - Maintenance release for the 1.5.x series diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper index 217d1658..296a77d1 100644 --- a/packaging/common/rabbitmq-script-wrapper +++ b/packaging/common/rabbitmq-script-wrapper @@ -9,7 +9,7 @@ for arg in "$@" ; do CMDLINE="${CMDLINE} \"${arg}\"" done -cd / +cd /var/lib/rabbitmq SCRIPT=`basename $0` diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 3be25f48..d1ccd3a0 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (1.5.4-1) hardy; urgency=low + + * New Upstream Release + + -- Matthias Radestock <matthias@lshift.net> Mon, 06 Apr 2009 09:19:32 +0100 + rabbitmq-server (1.5.3-1) hardy; urgency=low * New Upstream Release diff --git a/packaging/debs/Debian/debian/copyright b/packaging/debs/Debian/debian/copyright index 854db290..69867220 100644..100755 --- a/packaging/debs/Debian/debian/copyright +++ b/packaging/debs/Debian/debian/copyright @@ -3,9 +3,30 @@ Wed, 3 Jan 2007 15:43:44 +0000. It was downloaded from http://www.rabbitmq.com/ -codegen/amqp-0.8.json is released under the MIT License and is -Copyright (C) 2008-2009 LShift Ltd, Cohesive Financial Technologies -LLC, and Rabbit Technologies Ltd. +The file codegen/amqp-0.8.json is covered by the following terms: + + "Copyright (C) 2008-2009 LShift Ltd, Cohesive Financial Technologies LLC, + and Rabbit Technologies Ltd + + Permission is hereby granted, free of charge, to any person + obtaining a copy of this file (the Software), to deal in the + Software without restriction, including without limitation the + rights to use, copy, modify, merge, publish, distribute, + sublicense, and/or sell copies of the Software, and to permit + persons to whom the Software is furnished to do so, subject to + the following conditions: + + The above copyright notice and this permission notice shall be + included in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, + EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + OTHER DEALINGS IN THE SOFTWARE." The rest of this package is licensed under the Mozilla Public License 1.1 Authors and Copyright are as described below: diff --git a/packaging/debs/Debian/debian/init.d b/packaging/debs/Debian/debian/init.d index ef66add5..a35a60ec 100644 --- a/packaging/debs/Debian/debian/init.d +++ b/packaging/debs/Debian/debian/init.d @@ -26,7 +26,6 @@ fi RETVAL=0 set -e -cd / start_rabbitmq () { set +e diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 4cf0703a..1d0c785f 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -29,23 +29,23 @@ ## ## Contributor(s): ______________________________________. ## +NODENAME=rabbit +NODE_IP_ADDRESS=0.0.0.0 +NODE_PORT=5672 +SCRIPT_HOME=$(dirname $0) +PIDS_FILE=/var/lib/rabbitmq/pids +MULTI_ERL_ARGS= +MULTI_START_ARGS= [ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} -[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=rabbit [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} -[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=0.0.0.0 [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} -[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=5672 [ "x" = "x$RABBITMQ_SCRIPT_HOME" ] && RABBITMQ_SCRIPT_HOME=${SCRIPT_HOME} -[ "x" = "x$RABBITMQ_SCRIPT_HOME" ] && RABBITMQ_SCRIPT_HOME=$(dirname $0) [ "x" = "x$RABBITMQ_PIDS_FILE" ] && RABBITMQ_PIDS_FILE=${PIDS_FILE} -[ "x" = "x$RABBITMQ_PIDS_FILE" ] && RABBITMQ_PIDS_FILE=/var/lib/rabbitmq/pids [ "x" = "x$RABBITMQ_MULTI_ERL_ARGS" ] && RABBITMQ_MULTI_ERL_ARGS=${MULTI_ERL_ARGS} -[ "x" = "x$RABBITMQ_MULTI_ERL_ARGS" ] && RABBITMQ_MULTI_ERL_ARGS= [ "x" = "x$RABBITMQ_MULTI_START_ARGS" ] && RABBITMQ_MULTI_START_ARGS=${MULTI_START_ARGS} -[ "x" = "x$RABBITMQ_MULTI_START_ARGS" ] && RABBITMQ_MULTI_START_ARGS= export \ RABBITMQ_NODENAME \ diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat index 30f33a5a..a30c0889 100755 --- a/scripts/rabbitmq-multi.bat +++ b/scripts/rabbitmq-multi.bat @@ -47,7 +47,7 @@ if "%RABBITMQ_NODE_PORT%"=="" ( )
set RABBITMQ_PIDS_FILE=%RABBITMQ_BASE%\rabbitmq.pids
-set RABBITMQ_SCRIPT_HOME=%~dp0%
+set RABBITMQ_SCRIPT_HOME=%~sdp0%
if "%ERLANG_HOME%"=="" (
set ERLANG_HOME=%~dp0%..\..\..
@@ -65,5 +65,5 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" ( exit /B
)
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden -sname rabbitmq_multi -s rabbit_multi %START_ARGS% -extra %*
+"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_MULTI_ERL_ARGS% -sname rabbitmq_multi -s rabbit_multi %RABBITMQ_MULTI_START_ARGS% -extra %*
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 6273804f..8502d60a 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -30,28 +30,30 @@ ## Contributor(s): ______________________________________. ## +NODENAME=rabbit +NODE_IP_ADDRESS=0.0.0.0 +NODE_PORT=5672 +SERVER_ERL_ARGS="+K true +A30 \ +-kernel inet_default_listen_options [{nodelay,true},{sndbuf,16384},{recbuf,4096}] \ +-kernel inet_default_connect_options [{nodelay,true}]" +CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config +LOG_BASE=/var/log/rabbitmq +MNESIA_BASE=/var/lib/rabbitmq/mnesia +SERVER_START_ARGS= + [ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} -[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=rabbit [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} -[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=0.0.0.0 [ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} -[ "x" = "x$RABBITMQ_NODE_PORT" ] && RABBITMQ_NODE_PORT=5672 [ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS=${SERVER_ERL_ARGS} -[ "x" = "x$RABBITMQ_SERVER_ERL_ARGS" ] && RABBITMQ_SERVER_ERL_ARGS="+K true +A30 \ --kernel inet_default_listen_options [{nodelay,true},{sndbuf,16384},{recbuf,4096}] \ --kernel inet_default_connect_options [{nodelay,true}]" [ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=${CLUSTER_CONFIG_FILE} -[ "x" = "x$RABBITMQ_CLUSTER_CONFIG_FILE" ] && RABBITMQ_CLUSTER_CONFIG_FILE=/etc/rabbitmq/rabbitmq_cluster.config [ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=${LOG_BASE} -[ "x" = "x$RABBITMQ_LOG_BASE" ] && RABBITMQ_LOG_BASE=/var/log/rabbitmq [ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=${MNESIA_BASE} -[ "x" = "x$RABBITMQ_MNESIA_BASE" ] && RABBITMQ_MNESIA_BASE=/var/lib/rabbitmq/mnesia +[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS} + [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${MNESIA_DIR} [ "x" = "x$RABBITMQ_MNESIA_DIR" ] && RABBITMQ_MNESIA_DIR=${RABBITMQ_MNESIA_BASE}/${RABBITMQ_NODENAME} -[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS=${SERVER_START_ARGS} -[ "x" = "x$RABBITMQ_SERVER_START_ARGS" ] && RABBITMQ_SERVER_START_ARGS= ## Log rotation [ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS} diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 5b20ef20..9915727b 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -117,6 +117,7 @@ if "%RABBITMQ_MNESIA_DIR%"=="" ( -kernel inet_default_connect_options "[{nodelay, true}]" ^
-rabbit tcp_listeners "[{\"%RABBITMQ_NODE_IP_ADDRESS%\", %RABBITMQ_NODE_PORT%}]" ^
-kernel error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%.log"\"} ^
+%RABBITMQ_SERVER_ERL_ARGS% ^
-sasl errlog_type error ^
-sasl sasl_error_logger {file,\""%RABBITMQ_LOG_BASE%/%RABBITMQ_NODENAME%-sasl.log"\"} ^
-os_mon start_cpu_sup true ^
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index b941b850..c57978c0 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -30,10 +30,15 @@ ## Contributor(s): ______________________________________. ## +[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf + +[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS} + exec erl \ -pa "`dirname $0`/../ebin" \ -noinput \ -hidden \ + ${RABBITMQ_CTL_ERL_ARGS} \ -sname rabbitmqctl$$ \ -s rabbit_control \ -extra "$@" diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index 33a10777..e4dccfba 100755 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -46,4 +46,4 @@ if not exist "%ERLANG_HOME%\bin\erl.exe" ( exit /B
)
-"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden -sname rabbitmqctl -s rabbit_control -extra %*
+"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden %RABBITMQ_CTL_ERL_ARGS% -sname rabbitmqctl -s rabbit_control -extra %*
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2b9abb29..382810c3 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -122,19 +122,32 @@ recover() -> recover_durable_queues() -> Node = node(), - %% TODO: use dirty ops instead - R = rabbit_misc:execute_mnesia_transaction( - fun () -> - qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(durable_queues), - node(Pid) == Node])) - end), - Queues = lists:map(fun start_queue_process/1, R), - rabbit_misc:execute_mnesia_transaction( - fun () -> - lists:foreach(fun store_queue/1, Queues), - ok - end). + lists:foreach( + fun (RecoveredQ) -> + Q = start_queue_process(RecoveredQ), + %% We need to catch the case where a client connected to + %% another node has deleted the queue (and possibly + %% re-created it). + case rabbit_misc:execute_mnesia_transaction( + fun () -> case mnesia:match_object( + durable_queues, RecoveredQ, read) of + [_] -> ok = store_queue(Q), + true; + [] -> false + end + end) of + true -> ok; + false -> exit(Q#amqqueue.pid, shutdown) + end + end, + %% TODO: use dirty ops instead + rabbit_misc:execute_mnesia_transaction( + fun () -> + qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid} + <- mnesia:table(durable_queues), + node(Pid) == Node])) + end)), + ok. declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, @@ -280,28 +293,29 @@ internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({amqqueue, QueueName}) of - [] -> {error, not_found}; - [Q] -> - ok = delete_queue(Q), + [] -> {error, not_found}; + [_] -> + ok = rabbit_exchange:delete_queue_bindings(QueueName), + ok = mnesia:delete({amqqueue, QueueName}), ok = mnesia:delete({durable_queues, QueueName}), ok end end). -delete_queue(#amqqueue{name = QueueName}) -> - ok = rabbit_exchange:delete_bindings_for_queue(QueueName), - ok = mnesia:delete({amqqueue, QueueName}), - ok. - on_node_down(Node) -> rabbit_misc:execute_mnesia_transaction( fun () -> qlc:fold( - fun (Q, Acc) -> ok = delete_queue(Q), Acc end, + fun (QueueName, Acc) -> + ok = rabbit_exchange:delete_transient_queue_bindings( + QueueName), + ok = mnesia:delete({amqqueue, QueueName}), + Acc + end, ok, - qlc:q([Q || Q = #amqqueue{pid = Pid} - <- mnesia:table(amqqueue), - node(Pid) == Node])) + qlc:q([QueueName || #amqqueue{name = QueueName, pid = Pid} + <- mnesia:table(amqqueue), + node(Pid) == Node])) end). pseudo_queue(QueueName, Pid) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4bf2f446..5fd9a512 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -664,12 +664,16 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, State), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of + {error, exchange_not_found} -> + rabbit_misc:protocol_error( + not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); {error, queue_not_found} -> rabbit_misc:protocol_error( not_found, "no ~s", [rabbit_misc:rs(QueueName)]); - {error, exchange_not_found} -> + {error, exchange_and_queue_not_found} -> rabbit_misc:protocol_error( - not_found, "no ~s", [rabbit_misc:rs(ExchangeName)]); + not_found, "no ~s and no ~s", [rabbit_misc:rs(ExchangeName), + rabbit_misc:rs(QueueName)]); {error, binding_not_found} -> rabbit_misc:protocol_error( not_found, "no binding ~s between ~s and ~s", diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index cbc11b40..352d7e75 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -38,6 +38,19 @@ -define(RPC_TIMEOUT, 30000). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start/0 :: () -> no_return()). +-spec(stop/0 :: () -> 'ok'). +-spec(action/4 :: (atom(), erlang_node(), [string()], + fun ((string(), [any()]) -> 'ok')) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + start() -> FullCommand = init:get_plain_arguments(), #params{quiet = Quiet, node = Node, command = Command, args = Args} = diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 925c335c..7f3a78e9 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -40,7 +40,7 @@ route/2]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). --export([delete_bindings_for_queue/1]). +-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). -export([check_type/1, assert_type/2, topic_matches/2]). %% EXTENDED API @@ -59,8 +59,10 @@ -type(publish_res() :: {'ok', [pid()]} | not_found() | {'error', 'unroutable' | 'not_delivered'}). --type(bind_res() :: 'ok' | - {'error', 'queue_not_found' | 'exchange_not_found'}). +-type(bind_res() :: 'ok' | {'error', + 'queue_not_found' | + 'exchange_not_found' | + 'exchange_and_queue_not_found'}). -spec(recover/0 :: () -> 'ok'). -spec(declare/5 :: (exchange_name(), exchange_type(), bool(), bool(), amqp_table()) -> exchange()). @@ -86,7 +88,8 @@ bind_res() | {'error', 'binding_not_found'}). -spec(list_bindings/1 :: (vhost()) -> [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). --spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok'). +-spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok'). +-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok'). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). -spec(delete/2 :: (exchange_name(), bool()) -> 'ok' | not_found() | {'error', 'in_use'}). @@ -102,22 +105,15 @@ -define(INFO_KEYS, [name, type, durable, auto_delete, arguments]. recover() -> - rabbit_misc:execute_mnesia_transaction( - fun () -> - mnesia:foldl( - fun (Exchange, Acc) -> - ok = mnesia:write(Exchange), - Acc - end, ok, durable_exchanges), - mnesia:foldl( - fun (Route, Acc) -> - {_, ReverseRoute} = route_with_reverse(Route), - ok = mnesia:write(Route), - ok = mnesia:write(ReverseRoute), - Acc - end, ok, durable_routes), - ok - end). + ok = rabbit_misc:table_foreach( + fun(Exchange) -> ok = mnesia:write(Exchange) end, + durable_exchanges), + ok = rabbit_misc:table_foreach( + fun(Route) -> {_, ReverseRoute} = route_with_reverse(Route), + ok = mnesia:write(Route), + ok = mnesia:write(ReverseRoute) + end, durable_routes), + ok. declare(ExchangeName, Type, Durable, AutoDelete, Args) -> Exchange = #exchange{name = ExchangeName, @@ -278,18 +274,24 @@ lookup_qpids(Queues) -> %% refactored to its own module, especially seeing as unbind will have %% to be implemented for 0.91 ? -delete_bindings_for_exchange(ExchangeName) -> +delete_exchange_bindings(ExchangeName) -> indexed_delete( #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, fun delete_forward_routes/1, fun mnesia:delete_object/1). -delete_bindings_for_queue(QueueName) -> +delete_queue_bindings(QueueName) -> + delete_queue_bindings(QueueName, fun delete_forward_routes/1). + +delete_transient_queue_bindings(QueueName) -> + delete_queue_bindings(QueueName, fun delete_transient_forward_routes/1). + +delete_queue_bindings(QueueName, FwdDeleteFun) -> Exchanges = exchanges_for_queue(QueueName), indexed_delete( reverse_route(#route{binding = #binding{queue_name = QueueName, _ = '_'}}), - fun mnesia:delete_object/1, fun delete_forward_routes/1), + fun mnesia:delete_object/1, FwdDeleteFun), [begin [X] = mnesia:read({exchange, ExchangeName}), ok = maybe_auto_delete(X) @@ -307,6 +309,9 @@ delete_forward_routes(Route) -> ok = mnesia:delete_object(Route), ok = mnesia:delete_object(durable_routes, Route, write). +delete_transient_forward_routes(Route) -> + ok = mnesia:delete_object(Route). + exchanges_for_queue(QueueName) -> MatchHead = reverse_route( #route{binding = #binding{exchange_name = '$1', @@ -316,15 +321,13 @@ exchanges_for_queue(QueueName) -> sets:from_list( mnesia:select(reverse_route, [{MatchHead, [], ['$1']}]))). -has_bindings(ExchangeName) -> - MatchHead = #route{binding = #binding{exchange_name = ExchangeName, - _ = '_'}}, +contains(Table, MatchHead) -> try - continue(mnesia:select(route, [{MatchHead, [], ['$_']}], 1, read)) + continue(mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read)) catch exit:{aborted, {badarg, _}} -> %% work around OTP-7025, which was fixed in R12B-1, by %% falling back on a less efficient method - case mnesia:match_object(MatchHead) of + case mnesia:match_object(Table, MatchHead, read) of [] -> false; [_|_] -> true end @@ -337,18 +340,20 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)). call_with_exchange(Exchange, Fun) -> rabbit_misc:execute_mnesia_transaction( fun() -> case mnesia:read({exchange, Exchange}) of - [] -> {error, exchange_not_found}; + [] -> {error, not_found}; [X] -> Fun(X) end end). call_with_exchange_and_queue(Exchange, Queue, Fun) -> - call_with_exchange( - Exchange, - fun(X) -> case mnesia:read({amqqueue, Queue}) of - [] -> {error, queue_not_found}; - [Q] -> Fun(X, Q) - end + rabbit_misc:execute_mnesia_transaction( + fun() -> case {mnesia:read({exchange, Exchange}), + mnesia:read({amqqueue, Queue})} of + {[X], [Q]} -> Fun(X, Q); + {[ ], [_]} -> {error, exchange_not_found}; + {[_], [ ]} -> {error, queue_not_found}; + {[ ], [ ]} -> {error, exchange_and_queue_not_found} + end end). add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> @@ -468,13 +473,17 @@ maybe_auto_delete(Exchange = #exchange{auto_delete = true}) -> ok. conditional_delete(Exchange = #exchange{name = ExchangeName}) -> - case has_bindings(ExchangeName) of + Match = #route{binding = #binding{exchange_name = ExchangeName, _ = '_'}}, + %% we need to check for durable routes here too in case a bunch of + %% routes to durable queues have been removed temporarily as a + %% result of a node failure + case contains(route, Match) orelse contains(durable_routes, Match) of false -> unconditional_delete(Exchange); true -> {error, in_use} end. unconditional_delete(#exchange{name = ExchangeName}) -> - ok = delete_bindings_for_exchange(ExchangeName), + ok = delete_exchange_bindings(ExchangeName), ok = mnesia:delete({durable_exchanges, ExchangeName}), ok = mnesia:delete({exchange, ExchangeName}). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 053bde54..1fcd9a61 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -46,6 +46,7 @@ -export([ensure_ok/2]). -export([localnode/1, tcp_name/3]). -export([intersperse/2, upmap/2, map_in_order/2]). +-export([table_foreach/2]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). @@ -97,13 +98,14 @@ -spec(intersperse/2 :: (A, [A]) -> [A]). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). -spec(map_in_order/2 :: (fun ((A) -> B), [A]) -> [B]). +-spec(table_foreach/2 :: (fun ((any()) -> any()), atom()) -> 'ok'). -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). -spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}). -spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). --spec(format_stderr/2 :: (string(), [any()]) -> 'true'). +-spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -endif. @@ -295,6 +297,21 @@ map_in_order(F, L) -> lists:reverse( lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)). +%% For each entry in a table, execute a function in a transaction. +%% This is often far more efficient than wrapping a tx around the lot. +%% +%% We ignore entries that have been modified or removed. +table_foreach(F, TableName) -> + lists:foreach( + fun (E) -> execute_mnesia_transaction( + fun () -> case mnesia:match_object(TableName, E, read) of + [] -> ok; + _ -> F(E) + end + end) + end, dirty_read_all(TableName)), + ok. + dirty_read_all(TableName) -> mnesia:dirty_select(TableName, [{'$1',[],['$1']}]). @@ -355,6 +372,16 @@ ensure_parent_dirs_exist(Filename) -> end. format_stderr(Fmt, Args) -> - Port = open_port({fd, 0, 2}, [out]), - port_command(Port, io_lib:format(Fmt, Args)), - port_close(Port). + case os:type() of + {unix, _} -> + Port = open_port({fd, 0, 2}, [out]), + port_command(Port, io_lib:format(Fmt, Args)), + port_close(Port); + {win32, _} -> + %% stderr on Windows is buffered and I can't figure out a + %% way to trigger a fflush(stderr) in Erlang. So rather + %% than risk losing output we write to stdout instead, + %% which appears to be unbuffered. + io:format(Fmt, Args) + end, + ok. diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index 5e8edd53..d9197535 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -36,6 +36,17 @@ -define(RPC_SLEEP, 500). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start/0 :: () -> no_return()). +-spec(stop/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + start() -> RpcTimeout = case init:get_argument(maxwait) of diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index df2e71d9..6706ecd1 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -430,7 +430,13 @@ test_cluster_management2(SecondaryNode) -> ok = control_action(stop_app, []), {error, {no_running_cluster_nodes, _, _}} = control_action(reset, []), + + %% leave system clustered, with the secondary node as a ram node ok = control_action(force_reset, []), + ok = control_action(start_app, []), + ok = control_action(force_reset, SecondaryNode, []), + ok = control_action(cluster, SecondaryNode, [NodeS]), + ok = control_action(start_app, SecondaryNode, []), passed. |