diff options
author | Marek Majkowski <marek@rabbitmq.com> | 2011-02-25 12:18:02 +0000 |
---|---|---|
committer | Marek Majkowski <marek@rabbitmq.com> | 2011-02-25 12:18:02 +0000 |
commit | 2bf0e84461acaeedb9595870871407c48a302a60 (patch) | |
tree | 18d240329b52862d2c327e07531c8ffa0c2348e1 | |
parent | d7c926b9377343878f7bc263b8d44f6a1ae1cc8d (diff) | |
parent | 4e4eadb296da2eedf081d71f44568068102d80a0 (diff) | |
download | rabbitmq-server-2bf0e84461acaeedb9595870871407c48a302a60.tar.gz |
default merged into bug23799
34 files changed, 739 insertions, 1065 deletions
@@ -18,7 +18,7 @@ TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS) WEB_URL=http://www.rabbitmq.com/ MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml)) WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml) -USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml $(DOCS_DIR)/rabbitmq-multi.1.xml +USAGES_XML=$(DOCS_DIR)/rabbitmqctl.1.xml USAGES_ERL=$(foreach XML, $(USAGES_XML), $(call usage_xml_to_erl, $(XML))) ifeq ($(shell python -c 'import simplejson' 2>/dev/null && echo yes),yes) @@ -268,7 +268,7 @@ install_bin: all install_dirs cp -r ebin include LICENSE LICENSE-MPL-RabbitMQ INSTALL $(TARGET_DIR) chmod 0755 scripts/* - for script in rabbitmq-env rabbitmq-server rabbitmqctl rabbitmq-multi; do \ + for script in rabbitmq-env rabbitmq-server rabbitmqctl; do \ cp scripts/$$script $(TARGET_DIR)/sbin; \ [ -e $(SBIN_DIR)/$$script ] || ln -s $(SCRIPTS_REL_PATH)/$$script $(SBIN_DIR)/$$script; \ done diff --git a/docs/rabbitmq-env.conf.5.xml b/docs/rabbitmq-env.conf.5.xml index 4c7340c2..c887596c 100644 --- a/docs/rabbitmq-env.conf.5.xml +++ b/docs/rabbitmq-env.conf.5.xml @@ -76,7 +76,6 @@ NODENAME=hare <refsect1> <title>See also</title> <para> - <citerefentry><refentrytitle>rabbitmq-multi</refentrytitle><manvolnum>1</manvolnum></citerefentry> <citerefentry><refentrytitle>rabbitmq-server</refentrytitle><manvolnum>1</manvolnum></citerefentry> <citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry> </para> diff --git a/docs/rabbitmq-multi.1.xml b/docs/rabbitmq-multi.1.xml deleted file mode 100644 index 5f5c6c2f..00000000 --- a/docs/rabbitmq-multi.1.xml +++ /dev/null @@ -1,100 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN" "http://www.docbook.org/xml/4.5/docbookx.dtd"> -<refentry lang="en"> - <refentryinfo> - <productname>RabbitMQ Server</productname> - <authorgroup> - <corpauthor>The RabbitMQ Team <<ulink url="mailto:info@rabbitmq.com"><email>info@rabbitmq.com</email></ulink>></corpauthor> - </authorgroup> - </refentryinfo> - - <refmeta> - <refentrytitle>rabbitmq-multi</refentrytitle> - <manvolnum>1</manvolnum> - <refmiscinfo class="manual">RabbitMQ Server</refmiscinfo> - </refmeta> - - <refnamediv> - <refname>rabbitmq-multi</refname> - <refpurpose>start/stop local cluster RabbitMQ nodes</refpurpose> - </refnamediv> - - <refsynopsisdiv> - <cmdsynopsis> - <command>rabbitmq-multi</command> - <arg choice="req"><replaceable>command</replaceable></arg> - <arg choice="opt" rep="repeat"><replaceable>command options</replaceable></arg> - </cmdsynopsis> - </refsynopsisdiv> - - <refsect1> - <title>Description</title> - <para> - RabbitMQ is an implementation of AMQP, the emerging standard for high -performance enterprise messaging. The RabbitMQ server is a robust and -scalable implementation of an AMQP broker. - </para> - <para> -rabbitmq-multi scripts allows for easy set-up of a cluster on a single -machine. - </para> - </refsect1> - - <refsect1> - <title>Commands</title> - <variablelist> - <varlistentry> - <term><cmdsynopsis><command>start_all</command> <arg choice="req"><replaceable>count</replaceable></arg></cmdsynopsis></term> - <listitem> - <para> -Start count nodes with unique names, listening on all IP addresses and -on sequential ports starting from 5672. - </para> - <para role="example-prefix">For example:</para> - <screen role="example">rabbitmq-multi start_all 3</screen> - <para role="example"> - Starts 3 local RabbitMQ nodes with unique, sequential port numbers. - </para> - </listitem> - </varlistentry> - - <varlistentry> - <term><cmdsynopsis><command>status</command></cmdsynopsis></term> - <listitem> - <para> -Print the status of all running RabbitMQ nodes. - </para> - </listitem> - </varlistentry> - - <varlistentry> - <term><cmdsynopsis><command>stop_all</command></cmdsynopsis></term> - <listitem> - <para> -Stop all local RabbitMQ nodes, - </para> - </listitem> - </varlistentry> - - <varlistentry> - <term><cmdsynopsis><command>rotate_logs</command></cmdsynopsis></term> - <listitem> - <para> -Rotate log files for all local and running RabbitMQ nodes. - </para> - </listitem> - </varlistentry> - - </variablelist> - </refsect1> - - - <refsect1> - <title>See also</title> - <para> - <citerefentry><refentrytitle>rabbitmq-env.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry> - <citerefentry><refentrytitle>rabbitmq-server</refentrytitle><manvolnum>1</manvolnum></citerefentry> - <citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry> - </para> - </refsect1> -</refentry> diff --git a/docs/rabbitmq-server.1.xml b/docs/rabbitmq-server.1.xml index a0458c93..ca63927c 100644 --- a/docs/rabbitmq-server.1.xml +++ b/docs/rabbitmq-server.1.xml @@ -125,7 +125,6 @@ Defaults to 5672. <title>See also</title> <para> <citerefentry><refentrytitle>rabbitmq-env.conf</refentrytitle><manvolnum>5</manvolnum></citerefentry> - <citerefentry><refentrytitle>rabbitmq-multi</refentrytitle><manvolnum>1</manvolnum></citerefentry> <citerefentry><refentrytitle>rabbitmqctl</refentrytitle><manvolnum>1</manvolnum></citerefentry> </para> </refsect1> diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index bd9fee7d..3550e5ea 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -158,6 +158,28 @@ </varlistentry> <varlistentry> + <term><cmdsynopsis><command>wait</command></cmdsynopsis></term> + <listitem> + <para> + Wait for the RabbitMQ application to start. + </para> + <para> + This command will wait for the RabbitMQ application to + start at the node. As long as the Erlang node is up but + the RabbitMQ application is down it will wait + indefinitely. If the node itself goes down, or takes + more than five seconds to come up, it will fail. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl wait</screen> + <para role="example"> + This command will return when the RabbitMQ node has + started up. + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><cmdsynopsis><command>status</command></cmdsynopsis></term> <listitem> <para> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 24d0f961..b9a01735 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -54,6 +54,12 @@ -record(binding, {source, key, destination, args = []}). -record(reverse_binding, {destination, key, source, args = []}). +-record(topic_trie_edge, {trie_edge, node_id}). +-record(topic_trie_binding, {trie_binding, value = const}). + +-record(trie_edge, {exchange_name, node_id, word}). +-record(trie_binding, {exchange_name, node_id, destination}). + -record(listener, {node, protocol, host, ip_address, port}). -record(basic_message, {exchange_name, routing_key, content, guid, diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index 74a1800a..287945fe 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -31,7 +31,6 @@ prepare: cp ${COMMON_DIR}/* SOURCES/ sed -i \ - -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/sysconfig/rabbitmq|' \ -e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \ SOURCES/rabbitmq-server.init sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \ diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 5d573bde..ae9b2059 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -55,7 +55,6 @@ mkdir -p %{buildroot}%{_localstatedir}/log/rabbitmq install -p -D -m 0755 %{S:1} %{buildroot}%{_initrddir}/rabbitmq-server install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmqctl install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server -install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi install -p -D -m 0755 %{_rabbit_server_ocf} %{buildroot}%{_exec_prefix}/lib/ocf/resource.d/rabbitmq/rabbitmq-server install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server @@ -65,12 +64,8 @@ mkdir -p %{buildroot}%{_sysconfdir}/rabbitmq rm %{_maindir}/LICENSE %{_maindir}/LICENSE-MPL-RabbitMQ %{_maindir}/INSTALL #Build the list of files -rm -f %{_builddir}/%{name}.files -echo '%defattr(-,root,root, -)' >> %{_builddir}/%{name}.files -(cd %{buildroot}; \ - find . -type f ! -regex '\.%{_sysconfdir}.*' \ - ! -regex '\.\(%{_rabbit_erllibdir}\|%{_rabbit_libdir}\).*' \ - | sed -e 's/^\.//' >> %{_builddir}/%{name}.files) +echo '%defattr(-,root,root, -)' >%{_builddir}/%{name}.files +find %{buildroot} -path %{buildroot}%{_sysconfdir} -prune -o '!' -type d -printf "/%%P\n" >>%{_builddir}/%{name}.files %pre @@ -117,8 +112,6 @@ done %attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq %attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/log/rabbitmq %dir %{_sysconfdir}/rabbitmq -%{_rabbit_erllibdir} -%{_rabbit_libdir} %{_initrddir}/rabbitmq-server %config(noreplace) %{_sysconfdir}/logrotate.d/rabbitmq-server %doc LICENSE LICENSE-MPL-RabbitMQ diff --git a/packaging/common/rabbitmq-server.init b/packaging/common/rabbitmq-server.init index 39d23983..916dee6f 100644 --- a/packaging/common/rabbitmq-server.init +++ b/packaging/common/rabbitmq-server.init @@ -17,75 +17,77 @@ ### END INIT INFO PATH=/sbin:/usr/sbin:/bin:/usr/bin -DAEMON=/usr/sbin/rabbitmq-multi NAME=rabbitmq-server +DAEMON=/usr/sbin/${NAME} +CONTROL=/usr/sbin/rabbitmqctl DESC=rabbitmq-server USER=rabbitmq -NODE_COUNT=1 ROTATE_SUFFIX= INIT_LOG_DIR=/var/log/rabbitmq -DEFAULTS_FILE= # This is filled in when building packages LOCK_FILE= # This is filled in when building packages test -x $DAEMON || exit 0 -# Include rabbitmq defaults if available -if [ -f "$DEFAULTS_FILE" ] ; then - . $DEFAULTS_FILE -fi - RETVAL=0 set -e start_rabbitmq () { - set +e - $DAEMON start_all ${NODE_COUNT} > ${INIT_LOG_DIR}/startup_log 2> ${INIT_LOG_DIR}/startup_err - case "$?" in - 0) - echo SUCCESS - [ -n "$LOCK_FILE" ] && touch $LOCK_FILE + status_rabbitmq quiet + if [ $RETVAL = 0 ] ; then + echo RabbitMQ is currently running + else RETVAL=0 - ;; - 1) - echo TIMEOUT - check ${INIT_LOG_DIR}/startup_\{log,err\} - RETVAL=1 - ;; - *) - echo FAILED - check ${INIT_LOG_DIR}/startup_log, _err - RETVAL=1 - ;; - esac - set -e + set +e + setsid sh -c "$DAEMON > ${INIT_LOG_DIR}/startup_log \ + 2> ${INIT_LOG_DIR}/startup_err" & + $CONTROL wait >/dev/null 2>&1 + RETVAL=$? + set -e + case "$RETVAL" in + 0) + echo SUCCESS + if [ -n "$LOCK_FILE" ] ; then + touch $LOCK_FILE + fi + ;; + *) + echo FAILED - check ${INIT_LOG_DIR}/startup_\{log, _err\} + RETVAL=1 + ;; + esac + fi } stop_rabbitmq () { - set +e status_rabbitmq quiet if [ $RETVAL = 0 ] ; then - $DAEMON stop_all > ${INIT_LOG_DIR}/shutdown_log 2> ${INIT_LOG_DIR}/shutdown_err + set +e + $CONTROL stop > ${INIT_LOG_DIR}/shutdown_log 2> ${INIT_LOG_DIR}/shutdown_err RETVAL=$? + set -e if [ $RETVAL = 0 ] ; then - [ -n "$LOCK_FILE" ] && rm -rf $LOCK_FILE + if [ -n "$LOCK_FILE" ] ; then + rm -f $LOCK_FILE + fi else echo FAILED - check ${INIT_LOG_DIR}/shutdown_log, _err fi else - echo No nodes running + echo RabbitMQ is not running RETVAL=0 fi - set -e } status_rabbitmq() { set +e if [ "$1" != "quiet" ] ; then - $DAEMON status 2>&1 + $CONTROL status 2>&1 else - $DAEMON status > /dev/null 2>&1 + $CONTROL status > /dev/null 2>&1 fi if [ $? != 0 ] ; then - RETVAL=1 + RETVAL=3 fi set -e } @@ -99,8 +101,18 @@ rotate_logs_rabbitmq() { set -e } +restart_running_rabbitmq () { + status_rabbitmq quiet + if [ $RETVAL = 0 ] ; then + restart_rabbitmq + else + echo RabbitMQ is not runnning + RETVAL=0 + fi +} + restart_rabbitmq() { - stop_rabbitmq + stop_rabbitmq start_rabbitmq } @@ -122,11 +134,16 @@ case "$1" in echo -n "Rotating log files for $DESC: " rotate_logs_rabbitmq ;; - force-reload|reload|restart|condrestart|try-restart) + force-reload|reload|restart) echo -n "Restarting $DESC: " restart_rabbitmq echo "$NAME." ;; + try-restart) + echo -n "Restarting $DESC: " + restart_running_rabbitmq + echo "$NAME." + ;; *) echo "Usage: $0 {start|stop|status|rotate-logs|restart|condrestart|try-restart|reload|force-reload}" >&2 RETVAL=1 diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf index dc0521dd..94999d0e 100755 --- a/packaging/common/rabbitmq-server.ocf +++ b/packaging/common/rabbitmq-server.ocf @@ -20,7 +20,7 @@ ## ## OCF instance parameters -## OCF_RESKEY_multi +## OCF_RESKEY_server ## OCF_RESKEY_ctl ## OCF_RESKEY_nodename ## OCF_RESKEY_ip @@ -38,11 +38,11 @@ ####################################################################### -OCF_RESKEY_multi_default="/usr/sbin/rabbitmq-multi" +OCF_RESKEY_server_default="/usr/sbin/rabbitmq-server" OCF_RESKEY_ctl_default="/usr/sbin/rabbitmqctl" OCF_RESKEY_nodename_default="rabbit@localhost" OCF_RESKEY_log_base_default="/var/log/rabbitmq" -: ${OCF_RESKEY_multi=${OCF_RESKEY_multi_default}} +: ${OCF_RESKEY_server=${OCF_RESKEY_server_default}} : ${OCF_RESKEY_ctl=${OCF_RESKEY_ctl_default}} : ${OCF_RESKEY_nodename=${OCF_RESKEY_nodename_default}} : ${OCF_RESKEY_log_base=${OCF_RESKEY_log_base_default}} @@ -61,12 +61,12 @@ Resource agent for RabbitMQ-server <shortdesc lang="en">Resource agent for RabbitMQ-server</shortdesc> <parameters> -<parameter name="multi" unique="0" required="0"> +<parameter name="server" unique="0" required="0"> <longdesc lang="en"> -The path to the rabbitmq-multi script +The path to the rabbitmq-server script </longdesc> -<shortdesc lang="en">Path to rabbitmq-multi</shortdesc> -<content type="string" default="${OCF_RESKEY_multi_default}" /> +<shortdesc lang="en">Path to rabbitmq-server</shortdesc> +<content type="string" default="${OCF_RESKEY_server_default}" /> </parameter> <parameter name="ctl" unique="0" required="0"> @@ -155,7 +155,7 @@ Expects to have a fully populated OCF RA-compliant environment set. END } -RABBITMQ_MULTI=$OCF_RESKEY_multi +RABBITMQ_SERVER=$OCF_RESKEY_server RABBITMQ_CTL=$OCF_RESKEY_ctl RABBITMQ_NODENAME=$OCF_RESKEY_nodename RABBITMQ_NODE_IP_ADDRESS=$OCF_RESKEY_ip @@ -177,8 +177,8 @@ export_vars() { } rabbit_validate_partial() { - if [ ! -x $RABBITMQ_MULTI ]; then - ocf_log err "rabbitmq-server multi $RABBITMQ_MULTI does not exist or is not executable"; + if [ ! -x $RABBITMQ_SERVER ]; then + ocf_log err "rabbitmq-server server $RABBITMQ_SERVER does not exist or is not executable"; exit $OCF_ERR_INSTALLED; fi @@ -210,8 +210,18 @@ rabbit_validate_full() { } rabbit_status() { + rabbitmqctl_action "status" +} + +rabbit_wait() { + rabbitmqctl_action "wait" +} + +rabbitmqctl_action() { local rc - $RABBITMQ_CTL $NODENAME_ARG status > /dev/null 2> /dev/null + local action + action=$1 + $RABBITMQ_CTL $NODENAME_ARG $action > /dev/null 2> /dev/null rc=$? case "$rc" in 0) @@ -223,7 +233,7 @@ rabbit_status() { return $OCF_NOT_RUNNING ;; *) - ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG status: $rc" + ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG $action: $rc" exit $OCF_ERR_GENERIC esac } @@ -238,28 +248,16 @@ rabbit_start() { export_vars - $RABBITMQ_MULTI start_all 1 > ${RABBITMQ_LOG_BASE}/startup_log 2> ${RABBITMQ_LOG_BASE}/startup_err & - rc=$? - - if [ "$rc" != 0 ]; then - ocf_log err "rabbitmq-server start command failed: $RABBITMQ_MULTI start_all 1, $rc" - return $rc - fi + setsid sh -c "$RABBITMQ_SERVER > ${RABBITMQ_LOG_BASE}/startup_log 2> ${RABBITMQ_LOG_BASE}/startup_err" & - # Spin waiting for the server to come up. + # Wait for the server to come up. # Let the CRM/LRM time us out if required - start_wait=1 - while [ $start_wait = 1 ]; do - rabbit_status - rc=$? - if [ "$rc" = $OCF_SUCCESS ]; then - start_wait=0 - elif [ "$rc" != $OCF_NOT_RUNNING ]; then - ocf_log info "rabbitmq-server start failed: $rc" - exit $OCF_ERR_GENERIC - fi - sleep 1 - done + rabbit_wait + rc=$? + if [ "$rc" != $OCF_SUCCESS ]; then + ocf_log info "rabbitmq-server start failed: $rc" + exit $OCF_ERR_GENERIC + fi return $OCF_SUCCESS } @@ -272,11 +270,11 @@ rabbit_stop() { return $OCF_SUCCESS fi - $RABBITMQ_MULTI stop_all & + $RABBITMQ_CTL stop rc=$? if [ "$rc" != 0 ]; then - ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_MULTI stop_all, $rc" + ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_CTL stop, $rc" return $rc fi diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index ab05f732..d937fbb2 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -23,7 +23,6 @@ package: clean cp -r debian $(UNPACKED_DIR) cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/ sed -i \ - -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/default/rabbitmq|' \ -e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \ $(UNPACKED_DIR)/debian/rabbitmq-server.init sed -i -e 's|@SU_RABBITMQ_SH_C@|su rabbitmq -s /bin/sh -c|' \ diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index 6b6df33b..a785b292 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -14,7 +14,7 @@ DOCDIR=$(DEB_DESTDIR)usr/share/doc/rabbitmq-server/ install/rabbitmq-server:: mkdir -p $(DOCDIR) rm $(RABBIT_LIB)LICENSE* $(RABBIT_LIB)INSTALL* - for script in rabbitmqctl rabbitmq-server rabbitmq-multi; do \ + for script in rabbitmqctl rabbitmq-server; do \ install -p -D -m 0755 debian/rabbitmq-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \ done sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index 8c22a75e..7583d668 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -85,24 +85,22 @@ post-destroot { reinplace -E "s:(/etc/rabbitmq/rabbitmq):${prefix}\\1:g" \ ${realsbin}/rabbitmq-env - foreach var {CONFIG_FILE LOG_BASE MNESIA_BASE PIDS_FILE} { + foreach var {CONFIG_FILE LOG_BASE MNESIA_BASE} { reinplace -E "s:^($var)=/:\\1=${prefix}/:" \ - ${realsbin}/rabbitmq-multi \ ${realsbin}/rabbitmq-server \ ${realsbin}/rabbitmqctl } xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \ - ${wrappersbin}/rabbitmq-multi + ${wrappersbin}/rabbitmq-server reinplace -E "s:MACPORTS_PREFIX/bin:${prefix}/bin:" \ - ${wrappersbin}/rabbitmq-multi + ${wrappersbin}/rabbitmq-server reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \ - ${wrappersbin}/rabbitmq-multi + ${wrappersbin}/rabbitmq-server reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \ - ${wrappersbin}/rabbitmq-multi - file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server - file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl + ${wrappersbin}/rabbitmq-server + file copy ${wrappersbin}/rabbitmq-server ${wrappersbin}/rabbitmqctl xinstall -m 644 -W ${mansrc}/man1 rabbitmq-server.1.gz rabbitmqctl.1.gz \ ${mandest}/man1/ diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile index abe174e0..dacfa620 100644 --- a/packaging/windows/Makefile +++ b/packaging/windows/Makefile @@ -11,7 +11,6 @@ dist: mv $(SOURCE_DIR)/scripts/rabbitmq-server.bat $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmq-service.bat $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmqctl.bat $(SOURCE_DIR)/sbin - mv $(SOURCE_DIR)/scripts/rabbitmq-multi.bat $(SOURCE_DIR)/sbin rm -rf $(SOURCE_DIR)/scripts rm -rf $(SOURCE_DIR)/codegen* $(SOURCE_DIR)/Makefile rm -f $(SOURCE_DIR)/README diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi deleted file mode 100755 index ebcf4b63..00000000 --- a/scripts/rabbitmq-multi +++ /dev/null @@ -1,72 +0,0 @@ -#!/bin/sh -## The contents of this file are subject to the Mozilla Public License -## Version 1.1 (the "License"); you may not use this file except in -## compliance with the License. You may obtain a copy of the License -## at http://www.mozilla.org/MPL/ -## -## Software distributed under the License is distributed on an "AS IS" -## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -## the License for the specific language governing rights and -## limitations under the License. -## -## The Original Code is RabbitMQ. -## -## The Initial Developer of the Original Code is VMware, Inc. -## Copyright (c) 2007-2011 VMware, Inc. All rights reserved. -## - -SCRIPT_HOME=$(dirname $0) -PIDS_FILE=/var/lib/rabbitmq/pids -MULTI_ERL_ARGS= -MULTI_START_ARGS= -CONFIG_FILE=/etc/rabbitmq/rabbitmq - -. `dirname $0`/rabbitmq-env - -DEFAULT_NODE_IP_ADDRESS=0.0.0.0 -DEFAULT_NODE_PORT=5672 -[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && [ "x" != "x$NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} -[ "x" = "x$RABBITMQ_NODE_PORT" ] && [ "x" != "x$NODE_PORT" ] && RABBITMQ_NODE_PORT=${NODE_PORT} -if [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] -then - if [ "x" != "x$RABBITMQ_NODE_PORT" ] - then RABBITMQ_NODE_IP_ADDRESS=${DEFAULT_NODE_IP_ADDRESS} - fi -else - if [ "x" = "x$RABBITMQ_NODE_PORT" ] - then RABBITMQ_NODE_PORT=${DEFAULT_NODE_PORT} - fi -fi -[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} -[ "x" = "x$RABBITMQ_SCRIPT_HOME" ] && RABBITMQ_SCRIPT_HOME=${SCRIPT_HOME} -[ "x" = "x$RABBITMQ_PIDS_FILE" ] && RABBITMQ_PIDS_FILE=${PIDS_FILE} -[ "x" = "x$RABBITMQ_MULTI_ERL_ARGS" ] && RABBITMQ_MULTI_ERL_ARGS=${MULTI_ERL_ARGS} -[ "x" = "x$RABBITMQ_MULTI_START_ARGS" ] && RABBITMQ_MULTI_START_ARGS=${MULTI_START_ARGS} -[ "x" = "x$RABBITMQ_CONFIG_FILE" ] && RABBITMQ_CONFIG_FILE=${CONFIG_FILE} - -export \ - RABBITMQ_NODENAME \ - RABBITMQ_NODE_IP_ADDRESS \ - RABBITMQ_NODE_PORT \ - RABBITMQ_SCRIPT_HOME \ - RABBITMQ_PIDS_FILE \ - RABBITMQ_CONFIG_FILE - -RABBITMQ_CONFIG_ARG= -[ -f "${RABBITMQ_CONFIG_FILE}.config" ] && RABBITMQ_CONFIG_ARG="-config ${RABBITMQ_CONFIG_FILE}" - -# we need to turn off path expansion because some of the vars, notably -# RABBITMQ_MULTI_ERL_ARGS, may contain terms that look like globs and -# there is no other way of preventing their expansion. -set -f - -exec erl \ - -pa "${RABBITMQ_HOME}/ebin" \ - -noinput \ - -hidden \ - ${RABBITMQ_MULTI_ERL_ARGS} \ - -sname rabbitmq_multi$$ \ - ${RABBITMQ_CONFIG_ARG} \ - -s rabbit_multi \ - ${RABBITMQ_MULTI_START_ARGS} \ - -extra "$@" diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat deleted file mode 100644 index a2d10f2e..00000000 --- a/scripts/rabbitmq-multi.bat +++ /dev/null @@ -1,84 +0,0 @@ -@echo off
-REM The contents of this file are subject to the Mozilla Public License
-REM Version 1.1 (the "License"); you may not use this file except in
-REM compliance with the License. You may obtain a copy of the License
-REM at http://www.mozilla.org/MPL/
-REM
-REM Software distributed under the License is distributed on an "AS IS"
-REM basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-REM the License for the specific language governing rights and
-REM limitations under the License.
-REM
-REM The Original Code is RabbitMQ.
-REM
-REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
-REM
-
-setlocal
-
-rem Preserve values that might contain exclamation marks before
-rem enabling delayed expansion
-set TDP0=%~dp0
-set STAR=%*
-setlocal enabledelayedexpansion
-
-if "!RABBITMQ_BASE!"=="" (
- set RABBITMQ_BASE=!APPDATA!\RabbitMQ
-)
-
-if "!COMPUTERNAME!"=="" (
- set COMPUTERNAME=localhost
-)
-
-if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
-)
-
-if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
- if not "!RABBITMQ_NODE_PORT!"=="" (
- set RABBITMQ_NODE_IP_ADDRESS=0.0.0.0
- )
-) else (
- if "!RABBITMQ_NODE_PORT!"=="" (
- set RABBITMQ_NODE_PORT=5672
- )
-)
-
-set RABBITMQ_PIDS_FILE=!RABBITMQ_BASE!\rabbitmq.pids
-set RABBITMQ_SCRIPT_HOME=!TDP0!
-
-if "!RABBITMQ_CONFIG_FILE!"=="" (
- set RABBITMQ_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq
-)
-
-if exist "!RABBITMQ_CONFIG_FILE!.config" (
- set RABBITMQ_CONFIG_ARG=-config "!RABBITMQ_CONFIG_FILE!"
-) else (
- set RABBITMQ_CONFIG_ARG=
-)
-
-if not exist "!ERLANG_HOME!\bin\erl.exe" (
- echo.
- echo ******************************
- echo ERLANG_HOME not set correctly.
- echo ******************************
- echo.
- echo Please either set ERLANG_HOME to point to your Erlang installation or place the
- echo RabbitMQ server distribution in the Erlang lib folder.
- echo.
- exit /B
-)
-
-"!ERLANG_HOME!\bin\erl.exe" ^
--pa "!TDP0!..\ebin" ^
--noinput -hidden ^
-!RABBITMQ_MULTI_ERL_ARGS! ^
--sname rabbitmq_multi!RANDOM! ^
-!RABBITMQ_CONFIG_ARG! ^
--s rabbit_multi ^
-!RABBITMQ_MULTI_START_ARGS! ^
--extra !STAR!
-
-endlocal
-endlocal
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 1e1f37cb..f41815d0 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -146,7 +146,8 @@ -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([obtain/0, transfer/1, set_limit/1, get_limit/0]). +-export([obtain/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0, + info/1]). -export([ulimit/0]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -259,11 +260,17 @@ -spec(transfer/1 :: (pid()) -> 'ok'). -spec(set_limit/1 :: (non_neg_integer()) -> 'ok'). -spec(get_limit/0 :: () -> non_neg_integer()). +-spec(info_keys/0 :: () -> [atom()]). +-spec(info/0 :: () -> [{atom(), any()}]). +-spec(info/1 :: ([atom()]) -> [{atom(), any()}]). -spec(ulimit/0 :: () -> 'infinity' | 'unknown' | non_neg_integer()). -endif. %%---------------------------------------------------------------------------- +-define(INFO_KEYS, [obtain_count, obtain_limit]). + +%%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -494,6 +501,11 @@ set_limit(Limit) -> get_limit() -> gen_server:call(?SERVER, get_limit, infinity). +info_keys() -> ?INFO_KEYS. + +info() -> info(?INFO_KEYS). +info(Items) -> gen_server:call(?SERVER, {info, Items}, infinity). + %%---------------------------------------------------------------------------- %% Internal functions %%---------------------------------------------------------------------------- @@ -789,6 +801,12 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset, {Error, Handle} end. +infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. + +i(obtain_count, #fhc_state{obtain_count = Count}) -> Count; +i(obtain_limit, #fhc_state{obtain_limit = Limit}) -> Limit; +i(Item, _) -> throw({bad_argument, Item}). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -871,13 +889,18 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, false -> {noreply, run_pending_item(Item, State)} end; + handle_call({set_limit, Limit}, _From, State) -> {reply, ok, maybe_reduce( process_pending(State #fhc_state { limit = Limit, obtain_limit = obtain_limit(Limit) }))}; + handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) -> - {reply, Limit, State}. + {reply, Limit, State}; + +handle_call({info, Items}, _From, State) -> + {reply, infos(Items, State), State}. handle_cast({register_callback, Pid, MFA}, State = #fhc_state { clients = Clients }) -> diff --git a/src/pg_local.erl b/src/pg_local.erl index fd515747..c9c3a3a7 100644 --- a/src/pg_local.erl +++ b/src/pg_local.erl @@ -83,7 +83,7 @@ get_members(Name) -> sync() -> ensure_started(), - gen_server:call(?MODULE, sync). + gen_server:call(?MODULE, sync, infinity). %%% %%% Callback functions from gen_server diff --git a/src/rabbit.erl b/src/rabbit.erl index 1beed5c1..faf484af 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -214,7 +214,8 @@ stop_and_halt() -> ok. status() -> - [{running_applications, application:which_applications()}] ++ + [{pid, list_to_integer(os:getpid())}, + {running_applications, application:which_applications()}] ++ rabbit_mnesia:status(). rotate_logs(BinarySuffix) -> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1c89539f..46b78c39 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -197,7 +197,7 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> arguments = Args, exclusive_owner = Owner, pid = none}), - case gen_server2:call(Q#amqqueue.pid, {init, false}) of + case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); Q1 -> Q1 end. @@ -324,10 +324,10 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(VHostPath, F) -> rabbit_misc:filter_exit_map(F, list(VHostPath)). info(#amqqueue{ pid = QPid }) -> - delegate_call(QPid, info, infinity). + delegate_call(QPid, info). info(#amqqueue{ pid = QPid }, Items) -> - case delegate_call(QPid, {info, Items}, infinity) of + case delegate_call(QPid, {info, Items}) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -337,7 +337,7 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end). info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end). consumers(#amqqueue{ pid = QPid }) -> - delegate_call(QPid, consumers, infinity). + delegate_call(QPid, consumers). consumers_all(VHostPath) -> lists:append( @@ -347,7 +347,7 @@ consumers_all(VHostPath) -> end)). stat(#amqqueue{pid = QPid}) -> - delegate_call(QPid, stat, infinity). + delegate_call(QPid, stat). emit_stats(#amqqueue{pid = QPid}) -> delegate_cast(QPid, emit_stats). @@ -356,9 +356,9 @@ delete_immediately(#amqqueue{ pid = QPid }) -> gen_server2:cast(QPid, delete_immediately). delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> - delegate_call(QPid, {delete, IfUnused, IfEmpty}, infinity). + delegate_call(QPid, {delete, IfUnused, IfEmpty}). -purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity). +purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge). deliver(QPid, Delivery = #delivery{immediate = true}) -> gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity); @@ -370,7 +370,7 @@ deliver(QPid, Delivery) -> true. requeue(QPid, MsgIds, ChPid) -> - delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity). + delegate_call(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}). @@ -399,17 +399,15 @@ limit_all(QPids, ChPid, LimiterPid) -> end). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> - delegate_call(QPid, {basic_get, ChPid, NoAck}, infinity). + delegate_call(QPid, {basic_get, ChPid, NoAck}). basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> delegate_call(QPid, {basic_consume, NoAck, ChPid, - LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}, - infinity). + LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> - ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}, - infinity). + ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). notify_sent(QPid, ChPid) -> gen_server2:cast(QPid, {notify_sent, ChPid}). @@ -500,8 +498,8 @@ safe_delegate_call_ok(F, Pids) -> {_, Bad} -> {error, Bad} end. -delegate_call(Pid, Msg, Timeout) -> - delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). +delegate_call(Pid, Msg) -> + delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, infinity) end). delegate_cast(Pid, Msg) -> delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end). diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index d67c7f58..dc81ace6 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -61,8 +61,7 @@ -spec(map_exception/3 :: (rabbit_channel:channel_number(), rabbit_types:amqp_error() | any(), rabbit_types:protocol()) -> - {boolean(), - rabbit_channel:channel_number(), + {rabbit_channel:channel_number(), rabbit_framing:amqp_method_record()}). -endif. @@ -301,24 +300,21 @@ clear_encoded_content(Content = #content{}) -> map_exception(Channel, Reason, Protocol) -> {SuggestedClose, ReplyCode, ReplyText, FailedMethod} = lookup_amqp_exception(Reason, Protocol), - ShouldClose = SuggestedClose orelse (Channel == 0), {ClassId, MethodId} = case FailedMethod of {_, _} -> FailedMethod; none -> {0, 0}; _ -> Protocol:method_id(FailedMethod) end, - {CloseChannel, CloseMethod} = - case ShouldClose of - true -> {0, #'connection.close'{reply_code = ReplyCode, - reply_text = ReplyText, - class_id = ClassId, - method_id = MethodId}}; - false -> {Channel, #'channel.close'{reply_code = ReplyCode, - reply_text = ReplyText, - class_id = ClassId, - method_id = MethodId}} - end, - {ShouldClose, CloseChannel, CloseMethod}. + case SuggestedClose orelse (Channel == 0) of + true -> {0, #'connection.close'{reply_code = ReplyCode, + reply_text = ReplyText, + class_id = ClassId, + method_id = MethodId}}; + false -> {Channel, #'channel.close'{reply_code = ReplyCode, + reply_text = ReplyText, + class_id = ClassId, + method_id = MethodId}} + end. lookup_amqp_exception(#amqp_error{name = Name, explanation = Expl, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4fd7e7e4..34a5e5a4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -20,16 +20,16 @@ -behaviour(gen_server2). --export([start_link/8, do/2, do/3, flush/1, shutdown/1]). +-export([start_link/9, do/2, do/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([emit_stats/1]). +-export([emit_stats/1, ready_for_close/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, prioritise_cast/2]). --record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, +-record(ch, {state, protocol, channel, reader_pid, writer_pid, limiter_pid, start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, @@ -67,10 +67,10 @@ -type(channel_number() :: non_neg_integer()). --spec(start_link/8 :: - (channel_number(), pid(), pid(), rabbit_types:user(), - rabbit_types:vhost(), rabbit_framing:amqp_table(), pid(), - fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> +-spec(start_link/9 :: + (channel_number(), pid(), pid(), rabbit_types:protocol(), + rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), + pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) -> rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), @@ -90,16 +90,17 @@ -spec(info_all/0 :: () -> [rabbit_types:infos()]). -spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). -spec(emit_stats/1 :: (pid()) -> 'ok'). +-spec(ready_for_close/1 :: (pid()) -> 'ok'). -endif. %%---------------------------------------------------------------------------- -start_link(Channel, ReaderPid, WriterPid, User, VHost, Capabilities, +start_link(Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities, CollectorPid, StartLimiterFun) -> - gen_server2:start_link(?MODULE, - [Channel, ReaderPid, WriterPid, User, VHost, - Capabilities, CollectorPid, StartLimiterFun], []). + gen_server2:start_link( + ?MODULE, [Channel, ReaderPid, WriterPid, Protocol, User, VHost, + Capabilities, CollectorPid, StartLimiterFun], []). do(Pid, Method) -> do(Pid, Method, none). @@ -108,7 +109,7 @@ do(Pid, Method, Content) -> gen_server2:cast(Pid, {method, Method, Content}). flush(Pid) -> - gen_server2:call(Pid, flush). + gen_server2:call(Pid, flush, infinity). shutdown(Pid) -> gen_server2:cast(Pid, terminate). @@ -148,14 +149,18 @@ info_all(Items) -> emit_stats(Pid) -> gen_server2:cast(Pid, emit_stats). +ready_for_close(Pid) -> + gen_server2:cast(Pid, ready_for_close). + %%--------------------------------------------------------------------------- -init([Channel, ReaderPid, WriterPid, User, VHost, Capabilities, CollectorPid, - StartLimiterFun]) -> +init([Channel, ReaderPid, WriterPid, Protocol, User, VHost, Capabilities, + CollectorPid, StartLimiterFun]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), StatsTimer = rabbit_event:init_stats_timer(), State = #ch{state = starting, + protocol = Protocol, channel = Channel, reader_pid = ReaderPid, writer_pid = WriterPid, @@ -222,14 +227,11 @@ handle_cast({method, Method, Content}, State) -> {noreply, NewState} -> noreply(NewState); stop -> - {stop, normal, State#ch{state = terminating}} + {stop, normal, State} catch exit:Reason = #amqp_error{} -> MethodName = rabbit_misc:method_record_type(Method), - {stop, normal, terminating(Reason#amqp_error{method = MethodName}, - State)}; - exit:normal -> - {stop, normal, State}; + send_exception(Reason#amqp_error{method = MethodName}, State); _:Reason -> {stop, {Reason, erlang:get_stacktrace()}, State} end; @@ -237,6 +239,11 @@ handle_cast({method, Method, Content}, State) -> handle_cast({flushed, QPid}, State) -> {noreply, queue_blocked(QPid, State), hibernate}; +handle_cast(ready_for_close, State = #ch{state = closing, + writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), + {stop, normal, State}; + handle_cast(terminate, State) -> {stop, normal, State}; @@ -309,18 +316,16 @@ handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), {hibernate, State#ch{stats_timer = StatsTimer1}}. -terminate(_Reason, State = #ch{state = terminating}) -> - terminate(State); - terminate(Reason, State) -> - Res = rollback_and_notify(State), + {Res, _State1} = rollback_and_notify(State), case Reason of normal -> ok = Res; shutdown -> ok = Res; {shutdown, _Term} -> ok = Res; _ -> ok end, - terminate(State). + pg_local:leave(rabbit_channels, self()), + rabbit_event:notify(channel_closed, [{pid, self()}]). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -358,10 +363,22 @@ return_ok(State, false, Msg) -> {reply, Msg, State}. ok_msg(true, _Msg) -> undefined; ok_msg(false, Msg) -> Msg. -terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) -> - ok = rollback_and_notify(State), - Reader ! {channel_exit, Channel, Reason}, - State#ch{state = terminating}. +send_exception(Reason, State = #ch{protocol = Protocol, + channel = Channel, + writer_pid = WriterPid, + reader_pid = ReaderPid}) -> + {CloseChannel, CloseMethod} = + rabbit_binary_generator:map_exception(Channel, Reason, Protocol), + rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", + [ReaderPid, Channel, Reason]), + %% something bad's happened: rollback_and_notify may not be 'ok' + {_Result, State1} = rollback_and_notify(State), + case CloseChannel of + Channel -> ok = rabbit_writer:send_command(WriterPid, CloseMethod), + {noreply, State1}; + _ -> ReaderPid ! {channel_exit, Channel, Reason}, + {stop, normal, State1} + end. return_queue_declare_ok(#resource{name = ActualName}, NoWait, MessageCount, ConsumerCount, State) -> @@ -544,11 +561,20 @@ handle_method(#'channel.open'{}, _, _State) -> handle_method(_Method, _, #ch{state = starting}) -> rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []); -handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> - ok = rollback_and_notify(State), - ok = rabbit_writer:send_command_sync(WriterPid, #'channel.close_ok'{}), +handle_method(#'channel.close_ok'{}, _, #ch{state = closing}) -> stop; +handle_method(#'channel.close'{}, _, State = #ch{state = closing}) -> + {reply, #'channel.close_ok'{}, State}; + +handle_method(_Method, _, State = #ch{state = closing}) -> + {noreply, State}; + +handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) -> + {ok, State1} = rollback_and_notify(State), + ReaderPid ! {channel_closing, self()}, + {noreply, State1}; + handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; @@ -1099,9 +1125,8 @@ binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}, - WriterPid, Reason) -> - {_Close, ReplyCode, ReplyText} = - rabbit_framing_amqp_0_9_1:lookup_amqp_exception(Reason), + #ch{protocol = Protocol, writer_pid = WriterPid}, Reason) -> + {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason), ok = rabbit_writer:send_command( WriterPid, #'basic.return'{reply_code = ReplyCode, @@ -1188,10 +1213,13 @@ internal_rollback(State = #ch{transaction_id = TxnKey, NewUAMQ = queue:join(UAQ, UAMQ), new_tx(State#ch{unacked_message_q = NewUAMQ}). +rollback_and_notify(State = #ch{state = closing}) -> + {ok, State}; rollback_and_notify(State = #ch{transaction_id = none}) -> - notify_queues(State); + {notify_queues(State), State#ch{state = closing}}; rollback_and_notify(State) -> - notify_queues(internal_rollback(State)). + State1 = internal_rollback(State), + {notify_queues(State1), State1#ch{state = closing}}. fold_per_queue(F, Acc0, UAQ) -> D = rabbit_misc:queue_fold( @@ -1258,10 +1286,10 @@ is_message_persistent(Content) -> end. process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> - ok = basic_return(Msg, State#ch.writer_pid, no_route), + ok = basic_return(Msg, State, no_route), record_confirm(MsgSeqNo, XName, State); process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> - ok = basic_return(Msg, State#ch.writer_pid, no_consumers), + ok = basic_return(Msg, State, no_consumers), record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); @@ -1334,10 +1362,6 @@ coalesce_and_send(MsgSeqNos, MkMsgFun, WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss], State. -terminate(_State) -> - pg_local:leave(rabbit_channels, self()), - rabbit_event:notify(channel_closed, [{pid, self()}]). - infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _) -> self(); diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 90058194..9cc407bc 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -31,12 +31,13 @@ -export_type([start_link_args/0]). -type(start_link_args() :: - {'tcp', rabbit_types:protocol(), rabbit_net:socket(), - rabbit_channel:channel_number(), non_neg_integer(), pid(), - rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), + {'tcp', rabbit_net:socket(), rabbit_channel:channel_number(), + non_neg_integer(), pid(), rabbit_types:protocol(), rabbit_types:user(), + rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()} | - {'direct', rabbit_channel:channel_number(), pid(), rabbit_types:user(), - rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()}). + {'direct', rabbit_channel:channel_number(), pid(), + rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), + rabbit_framing:amqp_table(), pid()}). -spec(start_link/1 :: (start_link_args()) -> {'ok', pid(), {pid(), any()}}). @@ -44,7 +45,7 @@ %%---------------------------------------------------------------------------- -start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, +start_link({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, User, VHost, Capabilities, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, WriterPid} = @@ -57,20 +58,20 @@ start_link({tcp, Protocol, Sock, Channel, FrameMax, ReaderPid, User, VHost, supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, - [Channel, ReaderPid, WriterPid, User, VHost, Capabilities, - Collector, start_limiter_fun(SupPid)]}, + [Channel, ReaderPid, WriterPid, Protocol, User, VHost, + Capabilities, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), {ok, SupPid, {ChannelPid, AState}}; -start_link({direct, Channel, ClientChannelPid, User, VHost, Capabilities, - Collector}) -> +start_link({direct, Channel, ClientChannelPid, Protocol, User, VHost, + Capabilities, Collector}) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, ChannelPid} = supervisor2:start_child( SupPid, {channel, {rabbit_channel, start_link, - [Channel, ClientChannelPid, ClientChannelPid, User, - VHost, Capabilities, Collector, + [Channel, ClientChannelPid, ClientChannelPid, Protocol, + User, VHost, Capabilities, Collector, start_limiter_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, SupPid, {ChannelPid, none}}. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 3a18950f..746bb66e 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -20,6 +20,7 @@ -export([start/0, stop/0, action/5, diagnostics/1]). -define(RPC_TIMEOUT, infinity). +-define(WAIT_FOR_VM_ATTEMPTS, 5). -define(QUIET_OPT, "-q"). -define(NODE_OPT, "-n"). @@ -293,7 +294,30 @@ action(list_permissions, Node, [], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Listing permissions in vhost ~p", [VHost]), display_list(call(Node, {rabbit_auth_backend_internal, - list_vhost_permissions, [VHost]})). + list_vhost_permissions, [VHost]})); + +action(wait, Node, [], _Opts, Inform) -> + Inform("Waiting for ~p", [Node]), + wait_for_application(Node, ?WAIT_FOR_VM_ATTEMPTS). + +wait_for_application(Node, Attempts) -> + case rpc_call(Node, application, which_applications, [infinity]) of + {badrpc, _} = E -> NewAttempts = Attempts - 1, + case NewAttempts of + 0 -> E; + _ -> wait_for_application0(Node, NewAttempts) + end; + Apps -> case proplists:is_defined(rabbit, Apps) of + %% We've seen the node up; if it goes down + %% die immediately. + true -> ok; + false -> wait_for_application0(Node, 0) + end + end. + +wait_for_application0(Node, Attempts) -> + timer:sleep(1000), + wait_for_application(Node, Attempts). default_if_empty(List, Default) when is_list(List) -> if List == [] -> diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 5c89bf49..586563f6 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -16,7 +16,7 @@ -module(rabbit_direct). --export([boot/0, connect/4, start_channel/6]). +-export([boot/0, connect/4, start_channel/7]). -include("rabbit.hrl"). @@ -28,10 +28,10 @@ -spec(connect/4 :: (binary(), binary(), binary(), rabbit_types:protocol()) -> {'ok', {rabbit_types:user(), rabbit_framing:amqp_table()}}). --spec(start_channel/6 :: - (rabbit_channel:channel_number(), pid(), rabbit_types:user(), - rabbit_types:vhost(), rabbit_framing:amqp_table(), pid()) -> - {'ok', pid()}). +-spec(start_channel/7 :: + (rabbit_channel:channel_number(), pid(), rabbit_types:protocol(), + rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), + pid()) -> {'ok', pid()}). -endif. @@ -69,10 +69,11 @@ connect(Username, Password, VHost, Protocol) -> {error, broker_not_found_on_node} end. -start_channel(Number, ClientChannelPid, User, VHost, Capabilities, Collector) -> +start_channel(Number, ClientChannelPid, Protocol, User, VHost, Capabilities, + Collector) -> {ok, _, {ChannelPid, _}} = supervisor2:start_child( rabbit_direct_client_sup, - [{direct, Number, ClientChannelPid, User, VHost, Capabilities, - Collector}]), + [{direct, Number, ClientChannelPid, Protocol, User, VHost, + Capabilities, Collector}]), {ok, ChannelPid}. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 9cbf8100..c1741b30 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -15,6 +15,7 @@ %% -module(rabbit_exchange_type_topic). + -include("rabbit.hrl"). -behaviour(rabbit_exchange_type). @@ -31,58 +32,223 @@ {requires, rabbit_registry}, {enables, kernel_ready}]}). --export([topic_matches/2]). - --ifdef(use_specs). - --spec(topic_matches/2 :: (binary(), binary()) -> boolean()). - --endif. +%%---------------------------------------------------------------------------- description() -> [{name, <<"topic">>}, {description, <<"AMQP topic exchange, as per the AMQP specification">>}]. -route(#exchange{name = Name}, - #delivery{message = #basic_message{routing_key = RoutingKey}}) -> - rabbit_router:match_bindings(Name, - fun (#binding{key = BindingKey}) -> - topic_matches(BindingKey, RoutingKey) - end). - -split_topic_key(Key) -> - string:tokens(binary_to_list(Key), "."). - -topic_matches(PatternKey, RoutingKey) -> - P = split_topic_key(PatternKey), - R = split_topic_key(RoutingKey), - topic_matches1(P, R). - -topic_matches1(["#"], _R) -> - true; -topic_matches1(["#" | PTail], R) -> - last_topic_match(PTail, [], lists:reverse(R)); -topic_matches1([], []) -> - true; -topic_matches1(["*" | PatRest], [_ | ValRest]) -> - topic_matches1(PatRest, ValRest); -topic_matches1([PatElement | PatRest], [ValElement | ValRest]) - when PatElement == ValElement -> - topic_matches1(PatRest, ValRest); -topic_matches1(_, _) -> - false. - -last_topic_match(P, R, []) -> - topic_matches1(P, R); -last_topic_match(P, R, [BacktrackNext | BacktrackList]) -> - topic_matches1(P, R) or - last_topic_match(P, [BacktrackNext | R], BacktrackList). +%% NB: This may return duplicate results in some situations (that's ok) +route(#exchange{name = X}, + #delivery{message = #basic_message{routing_key = Key}}) -> + Words = split_topic_key(Key), + mnesia:async_dirty(fun trie_match/2, [X, Words]). validate(_X) -> ok. create(_Tx, _X) -> ok. -recover(_X, _Bs) -> ok. -delete(_Tx, _X, _Bs) -> ok. -add_binding(_Tx, _X, _B) -> ok. -remove_bindings(_Tx, _X, _Bs) -> ok. + +recover(_Exchange, Bs) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> + lists:foreach(fun (B) -> internal_add_binding(B) end, Bs) + end). + +delete(true, #exchange{name = X}, _Bs) -> + trie_remove_all_edges(X), + trie_remove_all_bindings(X), + ok; +delete(false, _Exchange, _Bs) -> + ok. + +add_binding(true, _Exchange, Binding) -> + internal_add_binding(Binding); +add_binding(false, _Exchange, _Binding) -> + ok. + +remove_bindings(true, _X, Bs) -> + lists:foreach(fun remove_binding/1, Bs), + ok; +remove_bindings(false, _X, _Bs) -> + ok. + +remove_binding(#binding{source = X, key = K, destination = D}) -> + Path = [{FinalNode, _} | _] = follow_down_get_path(X, split_topic_key(K)), + trie_remove_binding(X, FinalNode, D), + remove_path_if_empty(X, Path), + ok. + assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). + +%%---------------------------------------------------------------------------- + +internal_add_binding(#binding{source = X, key = K, destination = D}) -> + FinalNode = follow_down_create(X, split_topic_key(K)), + trie_add_binding(X, FinalNode, D), + ok. + +trie_match(X, Words) -> + trie_match(X, root, Words, []). + +trie_match(X, Node, [], ResAcc) -> + trie_match_part(X, Node, "#", fun trie_match_skip_any/4, [], + trie_bindings(X, Node) ++ ResAcc); +trie_match(X, Node, [W | RestW] = Words, ResAcc) -> + lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) -> + trie_match_part(X, Node, WArg, MatchFun, RestWArg, Acc) + end, ResAcc, [{W, fun trie_match/4, RestW}, + {"*", fun trie_match/4, RestW}, + {"#", fun trie_match_skip_any/4, Words}]). + +trie_match_part(X, Node, Search, MatchFun, RestW, ResAcc) -> + case trie_child(X, Node, Search) of + {ok, NextNode} -> MatchFun(X, NextNode, RestW, ResAcc); + error -> ResAcc + end. + +trie_match_skip_any(X, Node, [], ResAcc) -> + trie_match(X, Node, [], ResAcc); +trie_match_skip_any(X, Node, [_ | RestW] = Words, ResAcc) -> + trie_match_skip_any(X, Node, RestW, + trie_match(X, Node, Words, ResAcc)). + +follow_down_create(X, Words) -> + case follow_down_last_node(X, Words) of + {ok, FinalNode} -> FinalNode; + {error, Node, RestW} -> lists:foldl( + fun (W, CurNode) -> + NewNode = new_node_id(), + trie_add_edge(X, CurNode, NewNode, W), + NewNode + end, Node, RestW) + end. + +follow_down_last_node(X, Words) -> + follow_down(X, fun (_, Node, _) -> Node end, root, Words). + +follow_down_get_path(X, Words) -> + {ok, Path} = + follow_down(X, fun (W, Node, PathAcc) -> [{Node, W} | PathAcc] end, + [{root, none}], Words), + Path. + +follow_down(X, AccFun, Acc0, Words) -> + follow_down(X, root, AccFun, Acc0, Words). + +follow_down(_X, _CurNode, _AccFun, Acc, []) -> + {ok, Acc}; +follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) -> + case trie_child(X, CurNode, W) of + {ok, NextNode} -> follow_down(X, NextNode, AccFun, + AccFun(W, NextNode, Acc), RestW); + error -> {error, Acc, Words} + end. + +remove_path_if_empty(_, [{root, none}]) -> + ok; +remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) -> + case trie_has_any_bindings(X, Node) orelse trie_has_any_children(X, Node) of + true -> ok; + false -> trie_remove_edge(X, Parent, Node, W), + remove_path_if_empty(X, RestPath) + end. + +trie_child(X, Node, Word) -> + case mnesia:read(rabbit_topic_trie_edge, + #trie_edge{exchange_name = X, + node_id = Node, + word = Word}) of + [#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode}; + [] -> error + end. + +trie_bindings(X, Node) -> + MatchHead = #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + destination = '$1'}}, + mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]). + +trie_add_edge(X, FromNode, ToNode, W) -> + trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3). + +trie_remove_edge(X, FromNode, ToNode, W) -> + trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3). + +trie_edge_op(X, FromNode, ToNode, W, Op) -> + ok = Op(rabbit_topic_trie_edge, + #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, + node_id = FromNode, + word = W}, + node_id = ToNode}, + write). + +trie_add_binding(X, Node, D) -> + trie_binding_op(X, Node, D, fun mnesia:write/3). + +trie_remove_binding(X, Node, D) -> + trie_binding_op(X, Node, D, fun mnesia:delete_object/3). + +trie_binding_op(X, Node, D, Op) -> + ok = Op(rabbit_topic_trie_binding, + #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + destination = D}}, + write). + +trie_has_any_children(X, Node) -> + has_any(rabbit_topic_trie_edge, + #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, + node_id = Node, + _ = '_'}, + _ = '_'}). + +trie_has_any_bindings(X, Node) -> + has_any(rabbit_topic_trie_binding, + #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, + node_id = Node, + _ = '_'}, + _ = '_'}). + +trie_remove_all_edges(X) -> + remove_all(rabbit_topic_trie_edge, + #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, + _ = '_'}, + _ = '_'}). + +trie_remove_all_bindings(X) -> + remove_all(rabbit_topic_trie_binding, + #topic_trie_binding{ + trie_binding = #trie_binding{exchange_name = X, _ = '_'}, + _ = '_'}). + +has_any(Table, MatchHead) -> + Select = mnesia:select(Table, [{MatchHead, [], ['$_']}], 1, read), + select_while_no_result(Select) /= '$end_of_table'. + +select_while_no_result({[], Cont}) -> + select_while_no_result(mnesia:select(Cont)); +select_while_no_result(Other) -> + Other. + +remove_all(Table, Pattern) -> + lists:foreach(fun (R) -> mnesia:delete_object(Table, R, write) end, + mnesia:match_object(Table, Pattern, write)). + +new_node_id() -> + rabbit_guid:guid(). + +split_topic_key(Key) -> + split_topic_key(Key, [], []). + +split_topic_key(<<>>, [], []) -> + []; +split_topic_key(<<>>, RevWordAcc, RevResAcc) -> + lists:reverse([lists:reverse(RevWordAcc) | RevResAcc]); +split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) -> + split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]); +split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) -> + split_topic_key(Rest, [C | RevWordAcc], RevResAcc). + diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 86ea7282..1b72dd76 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -65,7 +65,7 @@ start_link(ChPid, UnackedMsgCount) -> limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> - gen_server2:call(LimiterPid, {limit, PrefetchCount}). + gen_server2:call(LimiterPid, {limit, PrefetchCount}, infinity). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index a9b4e177..fc95b77b 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -20,7 +20,7 @@ -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, cluster/1, force_cluster/1, reset/0, force_reset/0, is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, - empty_ram_only_tables/0, copy_db/1]). + empty_ram_only_tables/0, copy_db/1, wait_for_tables/1]). -export([table_names/0]). @@ -54,6 +54,7 @@ -spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). +-spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). -endif. @@ -185,6 +186,17 @@ table_definitions() -> {type, ordered_set}, {match, #reverse_route{reverse_binding = reverse_binding_match(), _='_'}}]}, + {rabbit_topic_trie_edge, + [{record_name, topic_trie_edge}, + {attributes, record_info(fields, topic_trie_edge)}, + {type, ordered_set}, + {match, #topic_trie_edge{trie_edge = trie_edge_match(), _='_'}}]}, + {rabbit_topic_trie_binding, + [{record_name, topic_trie_binding}, + {attributes, record_info(fields, topic_trie_binding)}, + {type, ordered_set}, + {match, #topic_trie_binding{trie_binding = trie_binding_match(), + _='_'}}]}, %% Consider the implications to nodes_of_type/1 before altering %% the next entry. {rabbit_durable_exchange, @@ -216,6 +228,12 @@ reverse_binding_match() -> _='_'}. binding_destination_match() -> resource_match('_'). +trie_edge_match() -> + #trie_edge{exchange_name = exchange_name_match(), + _='_'}. +trie_binding_match() -> + #trie_binding{exchange_name = exchange_name_match(), + _='_'}. exchange_name_match() -> resource_match(exchange). queue_name_match() -> @@ -264,45 +282,48 @@ ensure_schema_integrity() -> check_schema_integrity() -> Tables = mnesia:system_info(tables), - case [Error || {Tab, TabDef} <- table_definitions(), - case lists:member(Tab, Tables) of - false -> - Error = {table_missing, Tab}, - true; - true -> - {_, ExpAttrs} = proplists:lookup(attributes, TabDef), - Attrs = mnesia:table_info(Tab, attributes), - Error = {table_attributes_mismatch, Tab, - ExpAttrs, Attrs}, - Attrs /= ExpAttrs - end] of - [] -> check_table_integrity(); - Errors -> {error, Errors} + case check_tables(fun (Tab, TabDef) -> + case lists:member(Tab, Tables) of + false -> {error, {table_missing, Tab}}; + true -> check_table_attributes(Tab, TabDef) + end + end) of + ok -> ok = wait_for_tables(), + check_tables(fun check_table_content/2); + Other -> Other end. -check_table_integrity() -> - ok = wait_for_tables(), - case lists:all(fun ({Tab, TabDef}) -> - {_, Match} = proplists:lookup(match, TabDef), - read_test_table(Tab, Match) - end, table_definitions()) of - true -> ok; - false -> {error, invalid_table_content} +check_table_attributes(Tab, TabDef) -> + {_, ExpAttrs} = proplists:lookup(attributes, TabDef), + case mnesia:table_info(Tab, attributes) of + ExpAttrs -> ok; + Attrs -> {error, {table_attributes_mismatch, Tab, ExpAttrs, Attrs}} end. -read_test_table(Tab, Match) -> +check_table_content(Tab, TabDef) -> + {_, Match} = proplists:lookup(match, TabDef), case mnesia:dirty_first(Tab) of '$end_of_table' -> - true; + ok; Key -> ObjList = mnesia:dirty_read(Tab, Key), MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]), case ets:match_spec_run(ObjList, MatchComp) of - ObjList -> true; - _ -> false + ObjList -> ok; + _ -> {error, {table_content_invalid, Tab, Match, ObjList}} end end. +check_tables(Fun) -> + case [Error || {Tab, TabDef} <- table_definitions(), + case Fun(Tab, TabDef) of + ok -> Error = none, false; + {error, Error} -> true + end] of + [] -> ok; + Errors -> {error, Errors} + end. + %% The cluster node config file contains some or all of the disk nodes %% that are members of the cluster this node is / should be a part of. %% @@ -369,17 +390,15 @@ init_db(ClusterNodes, Force) -> case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of {[], true, [_]} -> %% True single disc node, attempt upgrade - ok = wait_for_tables(), case rabbit_upgrade:maybe_upgrade() of - ok -> ensure_schema_ok(); + ok -> ensure_schema_integrity(); version_not_available -> schema_ok_or_move() end; {[], true, _} -> %% "Master" (i.e. without config) disc node in cluster, %% verify schema - ok = wait_for_tables(), ensure_version_ok(rabbit_upgrade:read_version()), - ensure_schema_ok(); + ensure_schema_integrity(); {[], false, _} -> %% Nothing there at all, start from scratch ok = create_schema(); @@ -396,7 +415,7 @@ init_db(ClusterNodes, Force) -> true -> disc; false -> ram end), - ensure_schema_ok() + ensure_schema_integrity() end; {error, Reason} -> %% one reason we may end up here is if we try to join @@ -429,12 +448,6 @@ ensure_version_ok({ok, DiscVersion}) -> ensure_version_ok({error, _}) -> ok = rabbit_upgrade:write_version(). -ensure_schema_ok() -> - case check_schema_integrity() of - ok -> ok; - {error, Reason} -> throw({error, {schema_invalid, Reason}}) - end. - create_schema() -> mnesia:stop(), rabbit_misc:ensure_ok(mnesia:create_schema([node()]), @@ -443,7 +456,6 @@ create_schema() -> cannot_start_mnesia), ok = create_tables(), ok = ensure_schema_integrity(), - ok = wait_for_tables(), ok = rabbit_upgrade:write_version(). move_db() -> @@ -472,8 +484,7 @@ copy_db(Destination) -> mnesia:stop(), case rabbit_misc:recursive_copy(dir(), Destination) of ok -> - rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - ok = wait_for_tables(); + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia); {error, E} -> {error, E} end. @@ -541,7 +552,8 @@ wait_for_tables() -> wait_for_tables(table_names()). wait_for_tables(TableNames) -> case mnesia:wait_for_tables(TableNames, 30000) of - ok -> ok; + ok -> + ok; {timeout, BadTabs} -> throw({error, {timeout_waiting_for_tables, BadTabs}}); {error, Reason} -> diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl deleted file mode 100644 index ebd7fe8a..00000000 --- a/src/rabbit_multi.erl +++ /dev/null @@ -1,349 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. -%% - --module(rabbit_multi). --include("rabbit.hrl"). - --export([start/0, stop/0]). - --define(RPC_SLEEP, 500). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --spec(start/0 :: () -> no_return()). --spec(stop/0 :: () -> 'ok'). --spec(usage/0 :: () -> no_return()). - --endif. - -%%---------------------------------------------------------------------------- - -start() -> - RpcTimeout = - case init:get_argument(maxwait) of - {ok,[[N1]]} -> 1000 * list_to_integer(N1); - _ -> ?MAX_WAIT - end, - case init:get_plain_arguments() of - [] -> - usage(); - FullCommand -> - {Command, Args} = parse_args(FullCommand), - case catch action(Command, Args, RpcTimeout) of - ok -> - io:format("done.~n"), - halt(); - {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> - print_error("invalid command '~s'", - [string:join(FullCommand, " ")]), - usage(); - timeout -> - print_error("timeout starting some nodes.", []), - halt(1); - Other -> - print_error("~p", [Other]), - halt(2) - end - end. - -print_error(Format, Args) -> - rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args). - -parse_args([Command | Args]) -> - {list_to_atom(Command), Args}. - -stop() -> - ok. - -usage() -> - io:format("~s", [rabbit_multi_usage:usage()]), - halt(1). - -action(start_all, [NodeCount], RpcTimeout) -> - io:format("Starting all nodes...~n", []), - application:load(rabbit), - {_NodeNamePrefix, NodeHost} = NodeName = rabbit_misc:nodeparts( - getenv("RABBITMQ_NODENAME")), - case net_adm:names(NodeHost) of - {error, EpmdReason} -> - throw({cannot_connect_to_epmd, NodeHost, EpmdReason}); - {ok, _} -> - ok - end, - {NodePids, Running} = - case list_to_integer(NodeCount) of - 1 -> {NodePid, Started} = start_node(rabbit_misc:makenode(NodeName), - RpcTimeout), - {[NodePid], Started}; - N -> start_nodes(N, N, [], true, NodeName, - get_node_tcp_listener(), RpcTimeout) - end, - write_pids_file(NodePids), - case Running of - true -> ok; - false -> timeout - end; - -action(status, [], RpcTimeout) -> - io:format("Status of all running nodes...~n", []), - call_all_nodes( - fun ({Node, Pid}) -> - RabbitRunning = - case is_rabbit_running(Node, RpcTimeout) of - false -> not_running; - true -> running - end, - io:format("Node '~p' with Pid ~p: ~p~n", - [Node, Pid, RabbitRunning]) - end); - -action(stop_all, [], RpcTimeout) -> - io:format("Stopping all nodes...~n", []), - call_all_nodes(fun ({Node, Pid}) -> - io:format("Stopping node ~p~n", [Node]), - rpc:call(Node, rabbit, stop_and_halt, []), - case kill_wait(Pid, RpcTimeout, false) of - false -> kill_wait(Pid, RpcTimeout, true); - true -> ok - end, - io:format("OK~n", []) - end), - delete_pids_file(); - -action(rotate_logs, [], RpcTimeout) -> - action(rotate_logs, [""], RpcTimeout); - -action(rotate_logs, [Suffix], RpcTimeout) -> - io:format("Rotating logs for all nodes...~n", []), - BinarySuffix = list_to_binary(Suffix), - call_all_nodes( - fun ({Node, _}) -> - io:format("Rotating logs for node ~p", [Node]), - case rpc:call(Node, rabbit, rotate_logs, - [BinarySuffix], RpcTimeout) of - {badrpc, Error} -> io:format(": ~p.~n", [Error]); - ok -> io:format(": ok.~n", []) - end - end). - -%% PNodePid is the list of PIDs -%% Running is a boolean exhibiting success at some moment -start_nodes(0, _, PNodePid, Running, _, _, _) -> {PNodePid, Running}; - -start_nodes(N, Total, PNodePid, Running, NodeNameBase, Listener, RpcTimeout) -> - {NodePre, NodeSuff} = NodeNameBase, - NodeNumber = Total - N, - NodePre1 = case NodeNumber of - %% For compatibility with running a single node - 0 -> NodePre; - _ -> NodePre ++ "_" ++ integer_to_list(NodeNumber) - end, - Node = rabbit_misc:makenode({NodePre1, NodeSuff}), - os:putenv("RABBITMQ_NODENAME", atom_to_list(Node)), - case Listener of - {NodeIpAddress, NodePortBase} -> - NodePort = NodePortBase + NodeNumber, - os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)), - os:putenv("RABBITMQ_NODE_IP_ADDRESS", NodeIpAddress); - undefined -> - ok - end, - {NodePid, Started} = start_node(Node, RpcTimeout), - start_nodes(N - 1, Total, [NodePid | PNodePid], - Started and Running, NodeNameBase, Listener, RpcTimeout). - -start_node(Node, RpcTimeout) -> - io:format("Starting node ~s...~n", [Node]), - case rpc:call(Node, os, getpid, []) of - {badrpc, _} -> - Port = run_rabbitmq_server(), - Started = wait_for_rabbit_to_start(Node, RpcTimeout, Port), - Pid = case rpc:call(Node, os, getpid, []) of - {badrpc, _} -> throw(cannot_get_pid); - PidS -> list_to_integer(PidS) - end, - io:format("~s~n", [case Started of - true -> "OK"; - false -> "timeout" - end]), - {{Node, Pid}, Started}; - PidS -> - Pid = list_to_integer(PidS), - throw({node_already_running, Node, Pid}) - end. - -wait_for_rabbit_to_start(_ , RpcTimeout, _) when RpcTimeout < 0 -> - false; -wait_for_rabbit_to_start(Node, RpcTimeout, Port) -> - case is_rabbit_running(Node, RpcTimeout) of - true -> true; - false -> receive - {'EXIT', Port, PosixCode} -> - throw({node_start_failed, PosixCode}) - after ?RPC_SLEEP -> - wait_for_rabbit_to_start( - Node, RpcTimeout - ?RPC_SLEEP, Port) - end - end. - -run_rabbitmq_server() -> - with_os([{unix, fun run_rabbitmq_server_unix/0}, - {win32, fun run_rabbitmq_server_win32/0}]). - -run_rabbitmq_server_unix() -> - CmdLine = getenv("RABBITMQ_SCRIPT_HOME") ++ "/rabbitmq-server -noinput", - erlang:open_port({spawn, CmdLine}, [nouse_stdio]). - -run_rabbitmq_server_win32() -> - Cmd = filename:nativename(os:find_executable("cmd")), - CmdLine = "\"" ++ getenv("RABBITMQ_SCRIPT_HOME") ++ - "\\rabbitmq-server.bat\" -noinput -detached", - erlang:open_port({spawn_executable, Cmd}, - [{arg0, Cmd}, {args, ["/q", "/s", "/c", CmdLine]}, - nouse_stdio]). - -is_rabbit_running(Node, RpcTimeout) -> - case rpc:call(Node, rabbit, status, [], RpcTimeout) of - {badrpc, _} -> false; - Status -> case proplists:get_value(running_applications, Status) of - undefined -> false; - Apps -> lists:keymember(rabbit, 1, Apps) - end - end. - -with_os(Handlers) -> - {OsFamily, _} = os:type(), - case proplists:get_value(OsFamily, Handlers) of - undefined -> throw({unsupported_os, OsFamily}); - Handler -> Handler() - end. - -pids_file() -> getenv("RABBITMQ_PIDS_FILE"). - -write_pids_file(Pids) -> - FileName = pids_file(), - Handle = case file:open(FileName, [write]) of - {ok, Device} -> - Device; - {error, Reason} -> - throw({cannot_create_pids_file, FileName, Reason}) - end, - try - ok = io:write(Handle, Pids), - ok = io:put_chars(Handle, [$.]) - after - case file:close(Handle) of - ok -> ok; - {error, Reason1} -> - throw({cannot_create_pids_file, FileName, Reason1}) - end - end, - ok. - -delete_pids_file() -> - FileName = pids_file(), - case file:delete(FileName) of - ok -> ok; - {error, enoent} -> ok; - {error, Reason} -> throw({cannot_delete_pids_file, FileName, Reason}) - end. - -read_pids_file() -> - FileName = pids_file(), - case file:consult(FileName) of - {ok, [Pids]} -> Pids; - {error, enoent} -> []; - {error, Reason} -> throw({cannot_read_pids_file, FileName, Reason}) - end. - -kill_wait(Pid, TimeLeft, Forceful) when TimeLeft < 0 -> - Cmd = with_os([{unix, fun () -> if Forceful -> "kill -9"; - true -> "kill" - end - end}, - %% Kill forcefully always on Windows, since erl.exe - %% seems to completely ignore non-forceful killing - %% even when everything is working - {win32, fun () -> "taskkill /f /pid" end}]), - os:cmd(Cmd ++ " " ++ integer_to_list(Pid)), - false; % Don't assume what we did just worked! - -% Returns true if the process is dead, false otherwise. -kill_wait(Pid, TimeLeft, Forceful) -> - timer:sleep(?RPC_SLEEP), - io:format(".", []), - is_dead(Pid) orelse kill_wait(Pid, TimeLeft - ?RPC_SLEEP, Forceful). - -% Test using some OS clunkiness since we shouldn't trust -% rpc:call(os, getpid, []) at this point -is_dead(Pid) -> - PidS = integer_to_list(Pid), - with_os([{unix, fun () -> - system("kill -0 " ++ PidS - ++ " >/dev/null 2>&1") /= 0 - end}, - {win32, fun () -> - Res = os:cmd("tasklist /nh /fi \"pid eq " ++ - PidS ++ "\" 2>&1"), - case re:run(Res, "erl\\.exe", [{capture, none}]) of - match -> false; - _ -> true - end - end}]). - -% Like system(3) -system(Cmd) -> - ShCmd = "sh -c '" ++ escape_quotes(Cmd) ++ "'", - Port = erlang:open_port({spawn, ShCmd}, [exit_status,nouse_stdio]), - receive {Port, {exit_status, Status}} -> Status end. - -% Escape the quotes in a shell command so that it can be used in "sh -c 'cmd'" -escape_quotes(Cmd) -> - lists:flatten(lists:map(fun ($') -> "'\\''"; (Ch) -> Ch end, Cmd)). - -call_all_nodes(Func) -> - case read_pids_file() of - [] -> throw(no_nodes_running); - NodePids -> lists:foreach(Func, NodePids) - end. - -getenv(Var) -> - case os:getenv(Var) of - false -> throw({missing_env_var, Var}); - Value -> Value - end. - -get_node_tcp_listener() -> - try - {getenv("RABBITMQ_NODE_IP_ADDRESS"), - list_to_integer(getenv("RABBITMQ_NODE_PORT"))} - catch _ -> - case application:get_env(rabbit, tcp_listeners) of - {ok, [{_IpAddy, _Port} = Listener]} -> - Listener; - {ok, [Port]} when is_number(Port) -> - {"0.0.0.0", Port}; - {ok, []} -> - undefined; - {ok, Other} -> - throw({cannot_start_multiple_nodes, multiple_tcp_listeners, - Other}); - undefined -> - throw({missing_configuration, tcp_listeners}) - end - end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e9ff97f9..3908b646 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -57,92 +57,6 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). -%% connection lifecycle -%% -%% all state transitions and terminations are marked with *...* -%% -%% The lifecycle begins with: start handshake_timeout timer, *pre-init* -%% -%% all states, unless specified otherwise: -%% socket error -> *exit* -%% socket close -> *throw* -%% writer send failure -> *throw* -%% forced termination -> *exit* -%% handshake_timeout -> *throw* -%% pre-init: -%% receive protocol header -> send connection.start, *starting* -%% starting: -%% receive connection.start_ok -> *securing* -%% securing: -%% check authentication credentials -%% if authentication success -> send connection.tune, *tuning* -%% if more challenge needed -> send connection.secure, -%% receive connection.secure_ok *securing* -%% otherwise send close, *exit* -%% tuning: -%% receive connection.tune_ok -> start heartbeats, *opening* -%% opening: -%% receive connection.open -> send connection.open_ok, *running* -%% running: -%% receive connection.close -> -%% tell channels to terminate gracefully -%% if no channels then send connection.close_ok, start -%% terminate_connection timer, *closed* -%% else *closing* -%% forced termination -%% -> wait for channels to terminate forcefully, start -%% terminate_connection timer, send close, *exit* -%% channel exit with hard error -%% -> log error, wait for channels to terminate forcefully, start -%% terminate_connection timer, send close, *closed* -%% channel exit with soft error -%% -> log error, mark channel as closing, *running* -%% handshake_timeout -> ignore, *running* -%% heartbeat timeout -> *throw* -%% conserve_memory=true -> *blocking* -%% blocking: -%% conserve_memory=true -> *blocking* -%% conserve_memory=false -> *running* -%% receive a method frame for a content-bearing method -%% -> process, stop receiving, *blocked* -%% ...rest same as 'running' -%% blocked: -%% conserve_memory=true -> *blocked* -%% conserve_memory=false -> resume receiving, *running* -%% ...rest same as 'running' -%% closing: -%% socket close -> *terminate* -%% receive connection.close -> send connection.close_ok, -%% *closing* -%% receive frame -> ignore, *closing* -%% handshake_timeout -> ignore, *closing* -%% heartbeat timeout -> *throw* -%% channel exit with hard error -%% -> log error, wait for channels to terminate forcefully, start -%% terminate_connection timer, send close, *closed* -%% channel exit with soft error -%% -> log error, mark channel as closing -%% if last channel to exit then send connection.close_ok, -%% start terminate_connection timer, *closed* -%% else *closing* -%% channel exits normally -%% -> if last channel to exit then send connection.close_ok, -%% start terminate_connection timer, *closed* -%% closed: -%% socket close -> *terminate* -%% receive connection.close -> send connection.close_ok, -%% *closed* -%% receive connection.close_ok -> self() ! terminate_connection, -%% *closed* -%% receive frame -> ignore, *closed* -%% terminate_connection timeout -> *terminate* -%% handshake_timeout -> ignore, *closed* -%% heartbeat timeout -> *throw* -%% channel exit -> log error, *closed* -%% -%% -%% TODO: refactor the code so that the above is obvious - -define(IS_RUNNING(State), (State#v1.connection_state =:= running orelse State#v1.connection_state =:= blocking orelse @@ -337,6 +251,10 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> throw({inet_error, Reason}); {conserve_memory, Conserve} -> mainloop(Deb, internal_conserve_memory(Conserve, State)); + {channel_closing, ChPid} -> + ok = rabbit_channel:ready_for_close(ChPid), + channel_cleanup(ChPid), + mainloop(Deb, State); {'EXIT', Parent, Reason} -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), @@ -444,32 +362,32 @@ close_connection(State = #v1{queue_collector = Collector, erlang:send_after(TimeoutMillisec, self(), terminate_connection), State#v1{connection_state = closed}. -close_channel(Channel, State) -> - put({channel, Channel}, closing), - State. - handle_dependent_exit(ChPid, Reason, State) -> case termination_kind(Reason) of controlled -> - erase({ch_pid, ChPid}), + channel_cleanup(ChPid), maybe_close(State); uncontrolled -> case channel_cleanup(ChPid) of undefined -> exit({abnormal_dependent_exit, ChPid, Reason}); - Channel -> maybe_close( + Channel -> rabbit_log:error( + "connection ~p, channel ~p - error:~n~p~n", + [self(), Channel, Reason]), + maybe_close( handle_exception(State, Channel, Reason)) end end. channel_cleanup(ChPid) -> case get({ch_pid, ChPid}) of - undefined -> undefined; - Channel -> erase({channel, Channel}), - erase({ch_pid, ChPid}), - Channel + undefined -> undefined; + {Channel, MRef} -> erase({channel, Channel}), + erase({ch_pid, ChPid}), + erlang:demonitor(MRef, [flush]), + Channel end. -all_channels() -> [ChPid || {{ch_pid, ChPid}, _Channel} <- get()]. +all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. terminate_channels() -> NChannels = @@ -524,8 +442,8 @@ maybe_close(State = #v1{connection_state = closing, maybe_close(State) -> State. -termination_kind(normal) -> controlled; -termination_kind(_) -> uncontrolled. +termination_kind(normal) -> controlled; +termination_kind(_) -> uncontrolled. handle_frame(Type, 0, Payload, State = #v1{connection_state = CS, @@ -561,8 +479,8 @@ handle_frame(Type, Channel, Payload, Channel, ChPid, FramingState), put({channel, Channel}, {ChPid, NewAState}), case AnalyzedFrame of - {method, 'channel.close', _} -> - erase({channel, Channel}), + {method, 'channel.close_ok', _} -> + channel_cleanup(ChPid), State; {method, MethodName, _} -> case (State#v1.connection_state =:= blocking @@ -574,25 +492,6 @@ handle_frame(Type, Channel, Payload, _ -> State end; - closing -> - %% According to the spec, after sending a - %% channel.close we must ignore all frames except - %% channel.close and channel.close_ok. In the - %% event of a channel.close, we should send back a - %% channel.close_ok. - case AnalyzedFrame of - {method, 'channel.close_ok', _} -> - erase({channel, Channel}); - {method, 'channel.close', _} -> - %% We're already closing this channel, so - %% there's no cleanup to do (notify - %% queues, etc.) - ok = rabbit_writer:internal_send_command( - State#v1.sock, Channel, - #'channel.close_ok'{}, Protocol); - _ -> ok - end, - State; undefined -> case ?IS_RUNNING(State) of true -> send_to_new_channel( @@ -969,13 +868,13 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> capabilities = Capabilities}} = State, {ok, _ChSupPid, {ChPid, AState}} = rabbit_channel_sup_sup:start_channel( - ChanSupSup, {tcp, Protocol, Sock, Channel, FrameMax, self(), User, + ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User, VHost, Capabilities, Collector}), - erlang:monitor(process, ChPid), + MRef = erlang:monitor(process, ChPid), NewAState = process_channel_frame(AnalyzedFrame, self(), Channel, ChPid, AState), put({channel, Channel}, {ChPid, NewAState}), - put({ch_pid, ChPid}, Channel), + put({ch_pid, ChPid}, {Channel, MRef}), State. process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) -> @@ -991,29 +890,20 @@ process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) -> AState end. -log_channel_error(ConnectionState, Channel, Reason) -> - rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", - [self(), ConnectionState, Channel, Reason]). - -handle_exception(State = #v1{connection_state = closed}, Channel, Reason) -> - log_channel_error(closed, Channel, Reason), +handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) -> State; -handle_exception(State = #v1{connection_state = CS}, Channel, Reason) -> - log_channel_error(CS, Channel, Reason), +handle_exception(State, Channel, Reason) -> send_exception(State, Channel, Reason). send_exception(State = #v1{connection = #connection{protocol = Protocol}}, Channel, Reason) -> - {ShouldClose, CloseChannel, CloseMethod} = + {0, CloseMethod} = rabbit_binary_generator:map_exception(Channel, Reason, Protocol), - NewState = case ShouldClose of - true -> terminate_channels(), - close_connection(State); - false -> close_channel(Channel, State) - end, + terminate_channels(), + State1 = close_connection(State), ok = rabbit_writer:internal_send_command( - NewState#v1.sock, CloseChannel, CloseMethod, Protocol), - NewState. + State1#v1.sock, 0, CloseMethod, Protocol), + State1. internal_emit_stats(State = #v1{stats_timer = StatsTimer}) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl index 795413aa..9821ae7b 100644 --- a/src/rabbit_registry.erl +++ b/src/rabbit_registry.erl @@ -48,7 +48,7 @@ start_link() -> %%--------------------------------------------------------------------------- register(Class, TypeName, ModuleName) -> - gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}). + gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}, infinity). %% This is used with user-supplied arguments (e.g., on exchange %% declare), so we restrict it to existing atoms only. This means it diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 59862821..09695d95 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -585,32 +585,131 @@ sequence_with_content(Sequence) -> rabbit_framing_amqp_0_9_1), Sequence). -test_topic_match(P, R) -> - test_topic_match(P, R, true). - -test_topic_match(P, R, Expected) -> - case rabbit_exchange_type_topic:topic_matches(list_to_binary(P), - list_to_binary(R)) of - Expected -> - passed; - _ -> - {topic_match_failure, P, R} - end. - test_topic_matching() -> - passed = test_topic_match("#", "test.test"), - passed = test_topic_match("#", ""), - passed = test_topic_match("#.T.R", "T.T.R"), - passed = test_topic_match("#.T.R", "T.R.T.R"), - passed = test_topic_match("#.Y.Z", "X.Y.Z.X.Y.Z"), - passed = test_topic_match("#.test", "test"), - passed = test_topic_match("#.test", "test.test"), - passed = test_topic_match("#.test", "ignored.test"), - passed = test_topic_match("#.test", "more.ignored.test"), - passed = test_topic_match("#.test", "notmatched", false), - passed = test_topic_match("#.z", "one.two.three.four", false), + XName = #resource{virtual_host = <<"/">>, + kind = exchange, + name = <<"test_exchange">>}, + X = #exchange{name = XName, type = topic, durable = false, + auto_delete = false, arguments = []}, + %% create + rabbit_exchange_type_topic:validate(X), + exchange_op_callback(X, create, []), + + %% add some bindings + Bindings = lists:map( + fun ({Key, Q}) -> + #binding{source = XName, + key = list_to_binary(Key), + destination = #resource{virtual_host = <<"/">>, + kind = queue, + name = list_to_binary(Q)}} + end, [{"a.b.c", "t1"}, + {"a.*.c", "t2"}, + {"a.#.b", "t3"}, + {"a.b.b.c", "t4"}, + {"#", "t5"}, + {"#.#", "t6"}, + {"#.b", "t7"}, + {"*.*", "t8"}, + {"a.*", "t9"}, + {"*.b.c", "t10"}, + {"a.#", "t11"}, + {"a.#.#", "t12"}, + {"b.b.c", "t13"}, + {"a.b.b", "t14"}, + {"a.b", "t15"}, + {"b.c", "t16"}, + {"", "t17"}, + {"*.*.*", "t18"}, + {"vodka.martini", "t19"}, + {"a.b.c", "t20"}, + {"*.#", "t21"}, + {"#.*.#", "t22"}, + {"*.#.#", "t23"}, + {"#.#.#", "t24"}, + {"*", "t25"}, + {"#.b.#", "t26"}]), + lists:foreach(fun (B) -> exchange_op_callback(X, add_binding, [B]) end, + Bindings), + + %% test some matches + test_topic_expect_match(X, + [{"a.b.c", ["t1", "t2", "t5", "t6", "t10", "t11", "t12", + "t18", "t20", "t21", "t22", "t23", "t24", + "t26"]}, + {"a.b", ["t3", "t5", "t6", "t7", "t8", "t9", "t11", + "t12", "t15", "t21", "t22", "t23", "t24", + "t26"]}, + {"a.b.b", ["t3", "t5", "t6", "t7", "t11", "t12", "t14", + "t18", "t21", "t22", "t23", "t24", "t26"]}, + {"", ["t5", "t6", "t17", "t24"]}, + {"b.c.c", ["t5", "t6", "t18", "t21", "t22", "t23", "t24", + "t26"]}, + {"a.a.a.a.a", ["t5", "t6", "t11", "t12", "t21", "t22", "t23", + "t24"]}, + {"vodka.gin", ["t5", "t6", "t8", "t21", "t22", "t23", + "t24"]}, + {"vodka.martini", ["t5", "t6", "t8", "t19", "t21", "t22", "t23", + "t24"]}, + {"b.b.c", ["t5", "t6", "t10", "t13", "t18", "t21", "t22", + "t23", "t24", "t26"]}, + {"nothing.here.at.all", ["t5", "t6", "t21", "t22", "t23", "t24"]}, + {"oneword", ["t5", "t6", "t21", "t22", "t23", "t24", + "t25"]}]), + + %% remove some bindings + RemovedBindings = [lists:nth(1, Bindings), lists:nth(5, Bindings), + lists:nth(11, Bindings), lists:nth(19, Bindings), + lists:nth(21, Bindings)], + exchange_op_callback(X, remove_bindings, [RemovedBindings]), + RemainingBindings = ordsets:to_list( + ordsets:subtract(ordsets:from_list(Bindings), + ordsets:from_list(RemovedBindings))), + + %% test some matches + test_topic_expect_match(X, + [{"a.b.c", ["t2", "t6", "t10", "t12", "t18", "t20", "t22", + "t23", "t24", "t26"]}, + {"a.b", ["t3", "t6", "t7", "t8", "t9", "t12", "t15", + "t22", "t23", "t24", "t26"]}, + {"a.b.b", ["t3", "t6", "t7", "t12", "t14", "t18", "t22", + "t23", "t24", "t26"]}, + {"", ["t6", "t17", "t24"]}, + {"b.c.c", ["t6", "t18", "t22", "t23", "t24", "t26"]}, + {"a.a.a.a.a", ["t6", "t12", "t22", "t23", "t24"]}, + {"vodka.gin", ["t6", "t8", "t22", "t23", "t24"]}, + {"vodka.martini", ["t6", "t8", "t22", "t23", "t24"]}, + {"b.b.c", ["t6", "t10", "t13", "t18", "t22", "t23", + "t24", "t26"]}, + {"nothing.here.at.all", ["t6", "t22", "t23", "t24"]}, + {"oneword", ["t6", "t22", "t23", "t24", "t25"]}]), + + %% remove the entire exchange + exchange_op_callback(X, delete, [RemainingBindings]), + %% none should match now + test_topic_expect_match(X, [{"a.b.c", []}, {"b.b.c", []}, {"", []}]), passed. +exchange_op_callback(X, Fun, ExtraArgs) -> + rabbit_misc:execute_mnesia_transaction( + fun () -> rabbit_exchange:callback(X, Fun, [true, X] ++ ExtraArgs) end), + rabbit_exchange:callback(X, Fun, [false, X] ++ ExtraArgs). + +test_topic_expect_match(X, List) -> + lists:foreach( + fun ({Key, Expected}) -> + BinKey = list_to_binary(Key), + Res = rabbit_exchange_type_topic:route( + X, #delivery{message = #basic_message{routing_key = + BinKey}}), + ExpectedRes = lists:map( + fun (Q) -> #resource{virtual_host = <<"/">>, + kind = queue, + name = list_to_binary(Q)} + end, Expected), + true = (lists:usort(ExpectedRes) =:= lists:usort(Res)) + end, List). + test_app_management() -> %% starting, stopping, status ok = control_action(stop_app, []), @@ -1019,9 +1118,9 @@ test_user_management() -> test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), - {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, - user(<<"user">>), <<"/">>, [], self(), - fun (_) -> {ok, self()} end), + {ok, Ch} = rabbit_channel:start_link( + 1, self(), Writer, rabbit_framing_amqp_0_9_1, user(<<"user">>), + <<"/">>, [], self(), fun (_) -> {ok, self()} end), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( @@ -1079,9 +1178,9 @@ test_server_status() -> test_spawn(Receiver) -> Me = self(), Writer = spawn(fun () -> Receiver(Me) end), - {ok, Ch} = rabbit_channel:start_link(1, Me, Writer, user(<<"guest">>), - <<"/">>, [], self(), - fun (_) -> {ok, self()} end), + {ok, Ch} = rabbit_channel:start_link( + 1, Me, Writer, rabbit_framing_amqp_0_9_1, user(<<"guest">>), + <<"/">>, [], self(), fun (_) -> {ok, self()} end), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok after 1000 -> throw(failed_to_receive_channel_open_ok) @@ -1233,7 +1332,7 @@ must_exit(Fun) -> end. test_delegates_sync(SecondaryNode) -> - Sender = fun (Pid) -> gen_server:call(Pid, invoked) end, + Sender = fun (Pid) -> gen_server:call(Pid, invoked, infinity) end, BadSender = fun (_Pid) -> exit(exception) end, Responder = make_responder(fun ({'$gen_call', From, invoked}) -> @@ -1305,7 +1404,7 @@ test_queue_cleanup(_SecondaryNode) -> rabbit_channel:do(Ch, #'queue.declare'{ passive = true, queue = ?CLEANUP_QUEUE_NAME }), receive - {channel_exit, 1, {amqp_error, not_found, _, _}} -> + #'channel.close'{reply_code = 404} -> ok after 2000 -> throw(failed_to_receive_channel_exit) diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index b0a71523..89acc10c 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -98,7 +98,6 @@ vertices(Module, Steps) -> edges(_Module, Steps) -> [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires]. - unknown_heads(Heads, G) -> [H || H <- Heads, digraph:vertex(G, H) =:= false]. diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 68b88b3e..b9dbe418 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -25,6 +25,7 @@ -rabbit_upgrade({add_ip_to_listener, []}). -rabbit_upgrade({internal_exchanges, []}). -rabbit_upgrade({user_to_internal_user, [hash_passwords]}). +-rabbit_upgrade({topic_trie, []}). %% ------------------------------------------------------------------- @@ -35,6 +36,7 @@ -spec(add_ip_to_listener/0 :: () -> 'ok'). -spec(internal_exchanges/0 :: () -> 'ok'). -spec(user_to_internal_user/0 :: () -> 'ok'). +-spec(topic_trie/0 :: () -> 'ok'). -endif. @@ -47,7 +49,7 @@ %% point. remove_user_scope() -> - mnesia( + transform( rabbit_user_permission, fun ({user_permission, UV, {permission, _Scope, Conf, Write, Read}}) -> {user_permission, UV, {permission, Conf, Write, Read}} @@ -55,7 +57,7 @@ remove_user_scope() -> [user_vhost, permission]). hash_passwords() -> - mnesia( + transform( rabbit_user, fun ({user, Username, Password, IsAdmin}) -> Hash = rabbit_auth_backend_internal:hash_password(Password), @@ -64,7 +66,7 @@ hash_passwords() -> [username, password_hash, is_admin]). add_ip_to_listener() -> - mnesia( + transform( rabbit_listener, fun ({listener, Node, Protocol, Host, Port}) -> {listener, Node, Protocol, Host, {0,0,0,0}, Port} @@ -77,27 +79,41 @@ internal_exchanges() -> fun ({exchange, Name, Type, Durable, AutoDelete, Args}) -> {exchange, Name, Type, Durable, AutoDelete, false, Args} end, - [ ok = mnesia(T, - AddInternalFun, - [name, type, durable, auto_delete, internal, arguments]) + [ ok = transform(T, + AddInternalFun, + [name, type, durable, auto_delete, internal, arguments]) || T <- Tables ], ok. user_to_internal_user() -> - mnesia( + transform( rabbit_user, fun({user, Username, PasswordHash, IsAdmin}) -> {internal_user, Username, PasswordHash, IsAdmin} end, [username, password_hash, is_admin], internal_user). +topic_trie() -> + create(rabbit_topic_trie_edge, [{record_name, topic_trie_edge}, + {attributes, [trie_edge, node_id]}, + {type, ordered_set}]), + create(rabbit_topic_trie_binding, [{record_name, topic_trie_binding}, + {attributes, [trie_binding, value]}, + {type, ordered_set}]). + %%-------------------------------------------------------------------- -mnesia(TableName, Fun, FieldList) -> +transform(TableName, Fun, FieldList) -> + rabbit_mnesia:wait_for_tables([TableName]), {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList), ok. -mnesia(TableName, Fun, FieldList, NewRecordName) -> +transform(TableName, Fun, FieldList, NewRecordName) -> + rabbit_mnesia:wait_for_tables([TableName]), {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList, NewRecordName), ok. + +create(Tab, TabDef) -> + {atomic, ok} = mnesia:create_table(Tab, TabDef), + ok. |