diff options
author | Matthew Sackman <matthew@lshift.net> | 2009-08-25 14:00:50 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2009-08-25 14:00:50 +0100 |
commit | 431942e4d954e090370f900653e730eb4005b141 (patch) | |
tree | 841aba5d1389a816f3beb412db8019742bf8204a | |
parent | 1b8a5d0be0482e66d9652fc231a8be09df2b0add (diff) | |
parent | 4322cc73ba7cbc22f1299482a9352cff77cbf704 (diff) | |
download | rabbitmq-server-431942e4d954e090370f900653e730eb4005b141.tar.gz |
merging in from default
51 files changed, 1744 insertions, 584 deletions
@@ -12,6 +12,11 @@ syntax: regexp ^src/rabbit_framing.erl$ ^rabbit.plt$ ^ebin/rabbit.app$ +^ebin/rabbit.rel$ +^ebin/rabbit.boot$ +^ebin/rabbit.script$ +^plugins/ +^priv/plugins/ ^packaging/RPMS/Fedora/(BUILD|RPMS|SOURCES|SPECS|SRPMS)$ ^packaging/debs/Debian/rabbitmq-server_.*\.(dsc|(diff|tar)\.gz|deb|changes)$ @@ -20,10 +20,10 @@ PYTHON=python ifndef USE_SPECS # our type specs rely on features / bug fixes in dialyzer that are -# only available in R12B-5 upwards +# only available in R13B upwards (R13B is eshell 5.7.1) # # NB: the test assumes that version number will only contain single digits -USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.6.4" ]; then echo "true"; else echo "false"; fi) +USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.0" ]; then echo "true"; else echo "false"; fi) endif #other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests @@ -39,9 +39,6 @@ AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e -# for the moment we don't use boot files because they introduce a -# dependency on particular versions of OTP applications -#all: $(EBIN_DIR)/rabbit.boot all: $(TARGETS) $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app @@ -101,8 +98,8 @@ run-tests: all start-background-node: $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ RABBITMQ_NODE_ONLY=true \ - RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \ - ./scripts/rabbitmq-server -detached; sleep 1 + RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \ + ./scripts/rabbitmq-server ; sleep 1 start-rabbit-on-node: all echo "rabbit:start()." | $(ERL_CALL) @@ -116,8 +113,11 @@ force-snapshot: all stop-node: -$(ERL_CALL) -q +# code coverage will be created for subdirectory "ebin" of COVER_DIR +COVER_DIR=. + start-cover: all - echo "cover:start(), rabbit_misc:enable_cover()." | $(ERL_CALL) + echo "cover:start(), rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL) stop-cover: all echo "rabbit_misc:report_cover(), cover:stop()." | $(ERL_CALL) @@ -134,10 +134,10 @@ srcdist: distclean cp README.in $(TARGET_SRC_DIR)/README elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \ >> $(TARGET_SRC_DIR)/BUILD - sed -i.save 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save + sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ - cp codegen.py Makefile generate_app $(TARGET_SRC_DIR) + cp codegen.py Makefile generate_app calculate-relative $(TARGET_SRC_DIR) cp -r scripts $(TARGET_SRC_DIR) cp -r docs $(TARGET_SRC_DIR) @@ -148,7 +148,7 @@ srcdist: distclean rm -rf $(TARGET_SRC_DIR) distclean: clean - make -C $(AMQP_CODEGEN_DIR) distclean + $(MAKE) -C $(AMQP_CODEGEN_DIR) distclean rm -rf dist find . -regex '.*\(~\|#\|\.swp\|\.dump\)' -exec rm {} \; @@ -163,7 +163,8 @@ distclean: clean docs_all: $(MANPAGES) -install: all docs_all +install: SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR)) +install: all docs_all install_dirs @[ -n "$(TARGET_DIR)" ] || (echo "Please set TARGET_DIR."; false) @[ -n "$(SBIN_DIR)" ] || (echo "Please set SBIN_DIR."; false) @[ -n "$(MAN_DIR)" ] || (echo "Please set MAN_DIR."; false) @@ -172,13 +173,17 @@ install: all docs_all cp -r ebin include LICENSE LICENSE-MPL-RabbitMQ INSTALL $(TARGET_DIR) chmod 0755 scripts/* - mkdir -p $(SBIN_DIR) - cp scripts/rabbitmq-server $(SBIN_DIR) - cp scripts/rabbitmqctl $(SBIN_DIR) - cp scripts/rabbitmq-multi $(SBIN_DIR) + for script in rabbitmq-env rabbitmq-server rabbitmqctl rabbitmq-multi rabbitmq-activate-plugins; do \ + cp scripts/$$script $(TARGET_DIR)/sbin; \ + [ -e $(SBIN_DIR)/$$script ] || ln -s $(SCRIPTS_REL_PATH)/$$script $(SBIN_DIR)/$$script; \ + done for section in 1 5; do \ mkdir -p $(MAN_DIR)/man$$section; \ for manpage in docs/*.$$section.pod; do \ cp docs/`basename $$manpage .pod`.gz $(MAN_DIR)/man$$section; \ done; \ done + +install_dirs: + mkdir -p $(SBIN_DIR) + mkdir -p $(TARGET_DIR)/sbin diff --git a/calculate-relative b/calculate-relative new file mode 100755 index 00000000..3af18e8f --- /dev/null +++ b/calculate-relative @@ -0,0 +1,45 @@ +#!/usr/bin/env python +# +# relpath.py +# R.Barran 30/08/2004 +# Retrieved from http://code.activestate.com/recipes/302594/ + +import os +import sys + +def relpath(target, base=os.curdir): + """ + Return a relative path to the target from either the current dir or an optional base dir. + Base can be a directory specified either as absolute or relative to current dir. + """ + + if not os.path.exists(target): + raise OSError, 'Target does not exist: '+target + + if not os.path.isdir(base): + raise OSError, 'Base is not a directory or does not exist: '+base + + base_list = (os.path.abspath(base)).split(os.sep) + target_list = (os.path.abspath(target)).split(os.sep) + + # On the windows platform the target may be on a completely different drive from the base. + if os.name in ['nt','dos','os2'] and base_list[0] <> target_list[0]: + raise OSError, 'Target is on a different drive to base. Target: '+target_list[0].upper()+', base: '+base_list[0].upper() + + # Starting from the filepath root, work out how much of the filepath is + # shared by base and target. + for i in range(min(len(base_list), len(target_list))): + if base_list[i] <> target_list[i]: break + else: + # If we broke out of the loop, i is pointing to the first differing path elements. + # If we didn't break out of the loop, i is pointing to identical path elements. + # Increment i so that in all cases it points to the first differing path elements. + i+=1 + + rel_list = [os.pardir] * (len(base_list)-i) + target_list[i:] + if (len(rel_list) == 0): + return "." + return os.path.join(*rel_list) + +if __name__ == "__main__": + print(relpath(sys.argv[1], sys.argv[2])) diff --git a/docs/rabbitmq-activate-plugins.1.pod b/docs/rabbitmq-activate-plugins.1.pod new file mode 100644 index 00000000..6bf9f6c4 --- /dev/null +++ b/docs/rabbitmq-activate-plugins.1.pod @@ -0,0 +1,35 @@ +=head1 NAME + +rabbitmq-activate-plugins - command line tool for activating plugins in a RabbitMQ broker + +=head1 SYNOPSIS + +rabbitmq-activate-plugins + +=head1 DESCRIPTION + +RabbitMQ is an implementation of AMQP, the emerging standard for high +performance enterprise messaging. The RabbitMQ server is a robust and +scalable implementation of an AMQP broker. + +rabbitmq-activate-plugins is a command line tool for activating plugins installed +into the broker's plugins directory. + +=head1 EXAMPLES + +To activate all of the installed plugins in the current RabbitMQ install, +execute: + + rabbitmq-activate-plugins + +=head1 SEE ALSO + +rabbitmq.conf(5), rabbitmq-multi(1), rabbitmq-server(1), rabbitmqctl(1) + +=head1 AUTHOR + +The RabbitMQ Team <info@rabbitmq.com> + +=head1 REFERENCES + +RabbitMQ Web Site: http://www.rabbitmq.com diff --git a/docs/rabbitmq-multi.1.pod b/docs/rabbitmq-multi.1.pod index 23fd96ed..63848756 100644 --- a/docs/rabbitmq-multi.1.pod +++ b/docs/rabbitmq-multi.1.pod @@ -21,7 +21,7 @@ See also rabbitmq-server(1) for configuration information. start_all I<count> start count nodes with unique names, listening on all IP addresses - and on sequential ports starting from 5672. +and on sequential ports starting from 5672. status print the status of all running RabbitMQ nodes diff --git a/docs/rabbitmq-server.1.pod b/docs/rabbitmq-server.1.pod index 99a7cecc..04062b1a 100644 --- a/docs/rabbitmq-server.1.pod +++ b/docs/rabbitmq-server.1.pod @@ -21,34 +21,32 @@ process or use rabbitmqctl(1). =head1 ENVIRONMENT B<RABBITMQ_MNESIA_BASE> - Defaults to /var/lib/rabbitmq/mnesia. Set this to the directory - where Mnesia database files should be placed. + Defaults to /var/lib/rabbitmq/mnesia. Set this to the directory +where Mnesia database files should be placed. B<RABBITMQ_LOG_BASE> Defaults to /var/log/rabbitmq. Log files generated by the server - will be placed in this directory. +will be placed in this directory. B<RABBITMQ_NODENAME> Defaults to rabbit. This can be useful if you want to run more - than one node per machine - B<RABBITMQ_NODENAME> should be unique - per erlang-node-and-machine combination. See clustering on a - single machine guide at - http://www.rabbitmq.com/clustering.html#single-machine for - details. +than one node per machine - B<RABBITMQ_NODENAME> should be unique per +erlang-node-and-machine combination. See clustering on a single +machine guide at +http://www.rabbitmq.com/clustering.html#single-machine for details. B<RABBITMQ_NODE_IP_ADDRESS> Defaults to 0.0.0.0. This can be changed if you only want to bind - to one network interface. +to one network interface. B<RABBITMQ_NODE_PORT> Defaults to 5672. B<RABBITMQ_CLUSTER_CONFIG_FILE> Defaults to /etc/rabbitmq/rabbitmq_cluster.config. If this file is - present it is used by the server to auto-configure a RabbitMQ - cluster. - See the clustering guide at http://www.rabbitmq.com/clustering.html - for details. +present it is used by the server to auto-configure a RabbitMQ cluster. +See the clustering guide at http://www.rabbitmq.com/clustering.html +for details. =head1 OPTIONS diff --git a/docs/rabbitmq.conf.5.pod b/docs/rabbitmq.conf.5.pod index 9b2536c3..4d522163 100644 --- a/docs/rabbitmq.conf.5.pod +++ b/docs/rabbitmq.conf.5.pod @@ -18,12 +18,12 @@ built-in default values. For example, for the B<RABBITMQ_NODENAME> setting, B<RABBITMQ_NODENAME> from the environment is checked first. If it is absent or equal to - the empty string, then +the empty string, then B<NODENAME> from /etc/rabbitmq/rabbitmq.conf is checked. If it is also absent - or set equal to the empty string then the default value from - the startup script is used. +or set equal to the empty string then the default value from the +startup script is used. The variable names in /etc/rabbitmq/rabbitmq.conf are always equal to the environment variable names, with the B<RABBITMQ_> prefix removed: diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index 42156896..58fbb100 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -20,16 +20,16 @@ It performs all actions by connecting to one of the broker's nodes. B<-n> I<node> default node is C<rabbit@server>, where server is the local host. - On a host named C<server.example.com>, the node name of the - RabbitMQ Erlang node will usually be rabbit@server (unless - RABBITMQ_NODENAME has been set to some non-default value at broker - startup time). The output of hostname -s is usually the correct - suffix to use after the "@" sign. See rabbitmq-server(1) for - details of configuring the RabbitMQ broker. +On a host named C<server.example.com>, the node name of the RabbitMQ +Erlang node will usually be rabbit@server (unless RABBITMQ_NODENAME +has been set to some non-default value at broker startup time). The +output of hostname -s is usually the correct suffix to use after the +"@" sign. See rabbitmq-server(1) for details of configuring the +RabbitMQ broker. B<-q> - quiet output mode is selected with the B<-q> flag. Informational - messages are suppressed when quiet mode is in effect. + quiet output mode is selected with the B<-q> flag. Informational +messages are suppressed when quiet mode is in effect. =head1 COMMANDS @@ -40,53 +40,51 @@ stop stop_app stop the RabbitMQ application, leaving the Erlang node running. - This command is typically run prior to performing other management - actions that require the RabbitMQ application to be stopped, - e.g. I<reset>. +This command is typically run prior to performing other management +actions that require the RabbitMQ application to be stopped, +e.g. I<reset>. start_app start the RabbitMQ application. This command is typically run prior to performing other management - actions that require the RabbitMQ application to be stopped, - e.g. I<reset>. +actions that require the RabbitMQ application to be stopped, +e.g. I<reset>. status display various information about the RabbitMQ broker, such as - whether the RabbitMQ application on the current node, its version - number, what nodes are part of the broker, which of these are - running. +whether the RabbitMQ application on the current node, its version +number, what nodes are part of the broker, which of these are running. -force +reset return a RabbitMQ node to its virgin state. Removes the node from any cluster it belongs to, removes all data - from the management database, such as configured users, vhosts and - deletes all persistent messages. +from the management database, such as configured users, vhosts and +deletes all persistent messages. force_reset - the same as I<force> command, but resets the node unconditionally, - regardless of the current management database state and cluster - configuration. + the same as I<reset> command, but resets the node unconditionally, +regardless of the current management database state and cluster +configuration. It should only be used as a last resort if the database or cluster - configuration has been corrupted. +configuration has been corrupted. rotate_logs [suffix] instruct the RabbitMQ node to rotate the log files. The RabbitMQ - broker will attempt to append the current contents of the log file - to the file with the name composed of the original name and the - suffix. It will create a new file if such a file does not already - exist. When no I<suffix> is specified, the empty log file is - simply created at the original location; no rotation takes place. - When an error occurs while appending the contents of the old log - file, the operation behaves in the same way as if no I<suffix> was - specified. +broker will attempt to append the current contents of the log file to +the file with the name composed of the original name and the +suffix. It will create a new file if such a file does not already +exist. When no I<suffix> is specified, the empty log file is simply +created at the original location; no rotation takes place. When an +error occurs while appending the contents of the old log file, the +operation behaves in the same way as if no I<suffix> was specified. This command might be helpful when you are e.g. writing your own - logrotate script and you do not want to restart the RabbitMQ node. +logrotate script and you do not want to restart the RabbitMQ node. cluster I<clusternode> ... instruct the node to become member of a cluster with the specified - nodes determined by I<clusternode> option(s). - See http://www.rabbitmq.com/clustering.html for more information - about clustering. +nodes determined by I<clusternode> option(s). See +http://www.rabbitmq.com/clustering.html for more information about +clustering. =head2 USER MANAGEMENT @@ -110,35 +108,35 @@ add_vhost I<vhostpath> delete_vhost I<vhostpath> delete a virtual host I<vhostpath>. That command deletes also all its exchanges, queues and user - mappings. +mappings. list_vhosts list all virtual hosts. set_permissions [-p I<vhostpath>] I<username> I<regexp> I<regexp> I<regexp> set the permissions for the user named I<username> in the virtual - host I<vhostpath>, granting 'configure', 'write' and 'read' access - to resources with names matching the first, second and third - I<regexp>, respectively. +host I<vhostpath>, granting 'configure', 'write' and 'read' access to +resources with names matching the first, second and third I<regexp>, +respectively. clear_permissions [-p I<vhostpath>] I<username> remove the permissions for the user named I<username> in the - virtual host I<vhostpath>. +virtual host I<vhostpath>. list_permissions [-p I<vhostpath>] list all the users and their permissions in the virtual host - I<vhostpath>. +I<vhostpath>. list_user_permissions I<username> list the permissions of the user named I<username> across all - virtual hosts. +virtual hosts. =head2 SERVER STATUS list_queues [-p I<vhostpath>] [I<queueinfoitem> ...] list queue information by virtual host. If no I<queueinfoitem>s - are specified then then name and number of messages is displayed - for each queue. +are specified then then name and number of messages is displayed for +each queue. =head3 Queue information items @@ -163,8 +161,7 @@ messages_ready number of messages ready to be delivered to clients messages_unacknowledged - number of messages delivered to clients but not yet - acknowledged + number of messages delivered to clients but not yet acknowledged messages_uncommitted number of messages published in as yet uncommitted transactions @@ -174,7 +171,7 @@ messages acks_uncommitted number of acknowledgements received in as yet uncommitted - transactions +transactions consumers number of consumers @@ -184,14 +181,14 @@ transactions memory bytes of memory consumed by the Erlang process for the queue, - including stack, heap and internal structures +including stack, heap and internal structures =back list_exchanges [-p I<vhostpath>] [I<exchangeinfoitem> ...] list exchange information by virtual host. If no - I<exchangeinfoitem>s are specified then name and type is displayed - for each exchange. +I<exchangeinfoitem>s are specified then name and type is displayed for +each exchange. =head3 Exchange information items @@ -216,11 +213,11 @@ arguments list_bindings [-p I<vhostpath>] list bindings by virtual host. Each line contains exchange name, - routing key and queue name (all URL encoded) and arguments. +routing key and queue name (all URL encoded) and arguments. list_connections [I<connectioninfoitem> ...] list connection information. If no I<connectioninfoitem>s are - specified then the user, peer address and peer port are displayed. +specified then the user, peer address and peer port are displayed. =head3 Connection information items @@ -243,7 +240,7 @@ peer_port state connection state (B<pre-init>, B<starting>, B<tuning>, B<opening>, - B<running>, B<closing>, B<closed>) +B<running>, B<closing>, B<closed>) channels number of channels using the connection diff --git a/ebin/rabbit.rel b/ebin/rabbit.rel deleted file mode 100644 index c2d2067b..00000000 --- a/ebin/rabbit.rel +++ /dev/null @@ -1,7 +0,0 @@ -{release, {"rabbit", "1.1.0-alpha"}, {erts, "1.14.2"}, - [{rabbit, "1.1.0-alpha"}, - {mnesia, "4.3.4"}, - {os_mon, "2.1.2"}, - {sasl, "2.1.5"}, - {stdlib, "1.14.4"}, - {kernel, "2.11.4"}]}. diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 965da130..6fc6e464 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -1,7 +1,7 @@ {application, rabbit, %% -*- erlang -*- [{description, "RabbitMQ"}, {id, "RabbitMQ"}, - {vsn, "%%VERSION%%"}, + {vsn, "%%VSN%%"}, {modules, []}, {registered, [rabbit_amqqueue_sup, rabbit_log, diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index c74d4533..fa2844fd 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -1,7 +1,8 @@ -VERSION=0.0.0 -SOURCE_TARBALL_DIR=../../../dist +TARBALL_DIR=../../../dist +TARBALL=$(notdir $(wildcard $(TARBALL_DIR)/rabbitmq-server-[0-9.]*.tar.gz)) COMMON_DIR=../../common -TARBALL=$(SOURCE_TARBALL_DIR)/rabbitmq-server-$(VERSION).tar.gz +VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g') + TOP_DIR=$(shell pwd) #Under debian we do not want to check build dependencies, since that #only checks build-dependencies using rpms, not debs @@ -23,13 +24,16 @@ rpms: clean server prepare: mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp - cp $(TOP_DIR)/$(TARBALL) SOURCES + cp $(TARBALL_DIR)/$(TARBALL) SOURCES cp rabbitmq-server.spec SPECS sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|' \ SPECS/rabbitmq-server.spec - cp init.d SOURCES/rabbitmq-server.init cp ${COMMON_DIR}/* SOURCES/ + sed -i \ + -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/sysconfig/rabbitmq|' \ + -e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \ + SOURCES/rabbitmq-server.init cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate server: prepare diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 9e7c4bfb..7f442831 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -9,6 +9,7 @@ Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{v Source1: rabbitmq-server.init Source2: rabbitmq-script-wrapper Source3: rabbitmq-server.logrotate +Source4: rabbitmq-asroot-script-wrapper URL: http://www.rabbitmq.com/ BuildRequires: erlang, python-simplejson Requires: erlang, logrotate @@ -22,9 +23,10 @@ RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and scalable implementation of an AMQP broker. -%define _rabbit_erllibdir %{_libdir}/erlang/lib/rabbitmq_server-%{version} +%define _rabbit_erllibdir %{_libdir}/rabbitmq/lib/rabbitmq_server-%{version} %define _rabbit_libdir %{_libdir}/rabbitmq %define _rabbit_wrapper %{_builddir}/`basename %{S:2}` +%define _rabbit_asroot_wrapper %{_builddir}/`basename %{S:4}` %define _maindir %{buildroot}%{_rabbit_erllibdir} @@ -34,11 +36,9 @@ scalable implementation of an AMQP broker. %build cp %{S:2} %{_rabbit_wrapper} sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_wrapper} - -# The rabbitmq build needs escript, which is missing from /usr/bin in -# some versions of the erlang RPM. See -# <https://bugzilla.redhat.com/show_bug.cgi?id=481302> -PATH=%{_libdir}/erlang/bin:$PATH make %{?_smp_mflags} +cp %{S:4} %{_rabbit_asroot_wrapper} +sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_asroot_wrapper} +make %{?_smp_mflags} %install rm -rf %{buildroot} @@ -55,6 +55,7 @@ install -p -D -m 0755 %{S:1} %{buildroot}%{_initrddir}/rabbitmq-server install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmqctl install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi +install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-activate-plugins install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server diff --git a/packaging/common/rabbitmq-asroot-script-wrapper b/packaging/common/rabbitmq-asroot-script-wrapper new file mode 100644 index 00000000..0dd1c0fb --- /dev/null +++ b/packaging/common/rabbitmq-asroot-script-wrapper @@ -0,0 +1,53 @@ +#!/bin/bash +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License at +## http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +## License for the specific language governing rights and limitations +## under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developers of the Original Code are LShift Ltd, +## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +## Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Ltd. Portions created by Cohesive Financial Technologies LLC are +## Copyright (C) 2007-2009 Cohesive Financial Technologies +## LLC. Portions created by Rabbit Technologies Ltd are Copyright +## (C) 2007-2009 Rabbit Technologies Ltd. +## +## All Rights Reserved. +## +## Contributor(s): ______________________________________. +## + +# Escape spaces and quotes, because shell is revolting. +for arg in "$@" ; do + # Escape quotes in parameters, so that they're passed through cleanly. + arg=$(sed -e 's/"/\\"/' <<-END + $arg + END + ) + CMDLINE="${CMDLINE} \"${arg}\"" +done + +cd /var/lib/rabbitmq + +SCRIPT=`basename $0` + +if [ `id -u` = 0 ] ; then + /usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE} +else + echo -e "\nOnly root should run ${SCRIPT}\n" + exit 1 +fi + diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper index 296a77d1..f1a9b1ff 100644 --- a/packaging/common/rabbitmq-script-wrapper +++ b/packaging/common/rabbitmq-script-wrapper @@ -1,4 +1,35 @@ #!/bin/bash +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License at +## http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +## License for the specific language governing rights and limitations +## under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developers of the Original Code are LShift Ltd, +## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +## Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Ltd. Portions created by Cohesive Financial Technologies LLC are +## Copyright (C) 2007-2009 Cohesive Financial Technologies +## LLC. Portions created by Rabbit Technologies Ltd are Copyright +## (C) 2007-2009 Rabbit Technologies Ltd. +## +## All Rights Reserved. +## +## Contributor(s): ______________________________________. +## + # Escape spaces and quotes, because shell is revolting. for arg in "$@" ; do # Escape quotes in parameters, so that they're passed through cleanly. diff --git a/packaging/RPMS/Fedora/init.d b/packaging/common/rabbitmq-server.init index 77a6a89a..e71562f8 100644 --- a/packaging/RPMS/Fedora/init.d +++ b/packaging/common/rabbitmq-server.init @@ -8,10 +8,10 @@ ### BEGIN INIT INFO # Provides: rabbitmq-server -# Default-Start: -# Default-Stop: # Required-Start: $remote_fs $network # Required-Stop: $remote_fs $network +# Default-Start: +# Default-Stop: # Description: RabbitMQ broker # Short-Description: Enable AMQP service provided by RabbitMQ broker ### END INIT INFO @@ -24,13 +24,14 @@ USER=rabbitmq NODE_COUNT=1 ROTATE_SUFFIX= -LOCK_FILE=/var/lock/subsys/$NAME +DEFAULTS_FILE= # This is filled in when building packages +LOCK_FILE= # This is filled in when building packages test -x $DAEMON || exit 0 # Include rabbitmq defaults if available -if [ -f /etc/sysconfig/rabbitmq ] ; then - . /etc/sysconfig/rabbitmq +if [ -f "$DEFAULTS_FILE" ] ; then + . $DEFAULTS_FILE fi RETVAL=0 @@ -41,7 +42,8 @@ start_rabbitmq () { $DAEMON start_all ${NODE_COUNT} > /var/log/rabbitmq/startup_log 2> /var/log/rabbitmq/startup_err case "$?" in 0) - echo SUCCESS && touch $LOCK_FILE + echo SUCCESS + [ -n "$LOCK_FILE" ] && touch $LOCK_FILE RETVAL=0 ;; 1) @@ -52,7 +54,7 @@ start_rabbitmq () { echo FAILED - check /var/log/rabbitmq/startup_log, _err RETVAL=1 ;; - esac + esac set -e } @@ -62,10 +64,12 @@ stop_rabbitmq () { if [ $RETVAL = 0 ] ; then $DAEMON stop_all > /var/log/rabbitmq/shutdown_log 2> /var/log/rabbitmq/shutdown_err RETVAL=$? - if [ $RETVAL != 0 ] ; then - echo FAILED - check /var/log/rabbitmq/shutdown_log, _err + if [ $RETVAL = 0 ] ; then + # Try to stop epmd if run by the rabbitmq user + pkill -u rabbitmq epmd || : + [ -n "$LOCK_FILE" ] && rm -rf $LOCK_FILE else - rm -rf $LOCK_FILE + echo FAILED - check /var/log/rabbitmq/shutdown_log, _err fi else echo No nodes running @@ -119,19 +123,14 @@ case "$1" in echo -n "Rotating log files for $DESC: " rotate_logs_rabbitmq ;; - force-reload|reload|restart) - echo -n "Restarting $DESC: " - restart_rabbitmq - echo "$NAME." - ;; - condrestart|try-restart) + force-reload|reload|restart|condrestart|try-restart) echo -n "Restarting $DESC: " restart_rabbitmq echo "$NAME." ;; *) echo "Usage: $0 {start|stop|status|rotate-logs|restart|condrestart|try-restart|reload|force-reload}" >&2 - RETVAL=2 + RETVAL=1 ;; esac diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index 67fabae0..dafaf9ce 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -1,8 +1,9 @@ TARBALL_DIR=../../../dist -TARBALL=$(shell (cd $(TARBALL_DIR); echo rabbitmq-server-[0-9]*.tar.gz)) +TARBALL=$(notdir $(wildcard $(TARBALL_DIR)/rabbitmq-server-[0-9.]*.tar.gz)) COMMON_DIR=../../common -DEBIAN_ORIG_TARBALL=$(shell echo $(TARBALL) | sed -e 's:\(.*\)-\(.*\)\(\.tar\.gz\):\1_\2\.orig\3:g') VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g') + +DEBIAN_ORIG_TARBALL=$(shell echo $(TARBALL) | sed -e 's:\(.*\)-\(.*\)\(\.tar\.gz\):\1_\2\.orig\3:g') UNPACKED_DIR=rabbitmq-server-$(VERSION) PACKAGENAME=rabbitmq-server SIGNING_KEY_ID=056E8E56 @@ -21,6 +22,10 @@ package: clean tar -zxvf $(DEBIAN_ORIG_TARBALL) cp -r debian $(UNPACKED_DIR) cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/ + sed -i \ + -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/default/rabbitmq|' \ + -e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \ + $(UNPACKED_DIR)/debian/rabbitmq-server.init chmod a+x $(UNPACKED_DIR)/debian/rules UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR) cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING) diff --git a/packaging/debs/Debian/debian/init.d b/packaging/debs/Debian/debian/init.d deleted file mode 100644 index a35a60ec..00000000 --- a/packaging/debs/Debian/debian/init.d +++ /dev/null @@ -1,122 +0,0 @@ -#!/bin/sh -### BEGIN INIT INFO -# Provides: rabbitmq -# Required-Start: $remote_fs $network -# Required-Stop: $remote_fs $network -# Default-Start: 2 3 4 5 -# Default-Stop: 0 1 6 -# Description: RabbitMQ broker -# Short-Description: Enable AMQP service provided by RabbitMQ broker -### END INIT INFO - -PATH=/sbin:/usr/sbin:/bin:/usr/bin -DAEMON=/usr/sbin/rabbitmq-multi -NAME=rabbitmq-server -DESC=rabbitmq-server -USER=rabbitmq -NODE_COUNT=1 -ROTATE_SUFFIX= - -test -x $DAEMON || exit 0 - -# Include rabbitmq defaults if available -if [ -f /etc/default/rabbitmq ] ; then - . /etc/default/rabbitmq -fi - -RETVAL=0 -set -e - -start_rabbitmq () { - set +e - $DAEMON start_all ${NODE_COUNT} > /var/log/rabbitmq/startup_log 2> /var/log/rabbitmq/startup_err - case "$?" in - 0) - echo SUCCESS - RETVAL=0 - ;; - 1) - echo TIMEOUT - check /var/log/rabbitmq/startup_\{log,err\} - RETVAL=1 - ;; - *) - echo FAILED - check /var/log/rabbitmq/startup_log, _err - RETVAL=1 - ;; - esac - set -e -} - -stop_rabbitmq () { - set +e - status_rabbitmq quiet - if [ $RETVAL = 0 ] ; then - $DAEMON stop_all > /var/log/rabbitmq/shutdown_log 2> /var/log/rabbitmq/shutdown_err - RETVAL=$? - if [ $RETVAL != 0 ] ; then - echo FAILED - check /var/log/rabbitmq/shutdown_log, _err - fi - else - echo No nodes running - RETVAL=0 - fi - set -e -} - -status_rabbitmq() { - set +e - if [ "$1" != "quiet" ] ; then - $DAEMON status 2>&1 - else - $DAEMON status > /dev/null 2>&1 - fi - if [ $? != 0 ] ; then - RETVAL=1 - fi - set -e -} - -rotate_logs_rabbitmq() { - set +e - $DAEMON rotate_logs ${ROTATE_SUFFIX} - if [ $? != 0 ] ; then - RETVAL=1 - fi - set -e -} - -restart_rabbitmq() { - stop_rabbitmq - start_rabbitmq -} - -case "$1" in - start) - echo -n "Starting $DESC: " - start_rabbitmq - echo "$NAME." - ;; - stop) - echo -n "Stopping $DESC: " - stop_rabbitmq - echo "$NAME." - ;; - status) - status_rabbitmq - ;; - rotate-logs) - echo -n "Rotating log files for $DESC: " - rotate_logs_rabbitmq - ;; - force-reload|restart) - echo -n "Restarting $DESC: " - restart_rabbitmq - echo "$NAME." - ;; - *) - echo "Usage: $0 {start|stop|status|rotate-logs|restart|force-reload}" >&2 - RETVAL=1 - ;; -esac - -exit $RETVAL diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index 31904851..365eea6e 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -3,7 +3,7 @@ include /usr/share/cdbs/1/rules/debhelper.mk include /usr/share/cdbs/1/class/makefile.mk -RABBIT_LIB=$(DEB_DESTDIR)usr/lib/erlang/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)/ +RABBIT_LIB=$(DEB_DESTDIR)usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)/ RABBIT_BIN=$(DEB_DESTDIR)usr/lib/rabbitmq/bin/ DEB_MAKE_INSTALL_TARGET := install TARGET_DIR=$(RABBIT_LIB) SBIN_DIR=$(RABBIT_BIN) MAN_DIR=$(DEB_DESTDIR)usr/share/man/ @@ -17,3 +17,6 @@ install/rabbitmq-server:: for script in rabbitmqctl rabbitmq-server rabbitmq-multi; do \ install -p -D -m 0755 debian/rabbitmq-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \ done + for script in rabbitmq-activate-plugins; do \ + install -p -D -m 0755 debian/rabbitmq-asroot-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \ + done diff --git a/packaging/generic-unix/Makefile b/packaging/generic-unix/Makefile index b3988696..4eade6c7 100644 --- a/packaging/generic-unix/Makefile +++ b/packaging/generic-unix/Makefile @@ -4,10 +4,10 @@ TARGET_DIR=rabbitmq_server-$(VERSION) TARGET_TARBALL=rabbitmq-server-generic-unix-$(VERSION) dist: - make -C ../.. VERSION=$(VERSION) srcdist + $(MAKE) -C ../.. VERSION=$(VERSION) srcdist tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz - make -C $(SOURCE_DIR) \ + $(MAKE) -C $(SOURCE_DIR) \ TARGET_DIR=`pwd`/$(TARGET_DIR) \ SBIN_DIR=`pwd`/$(TARGET_DIR)/sbin \ MAN_DIR=`pwd`/$(TARGET_DIR)/share/man \ diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/net/rabbitmq-server/Portfile index b8096d20..1826d5c4 100644 --- a/packaging/macports/net/rabbitmq-server/Portfile +++ b/packaging/macports/net/rabbitmq-server/Portfile @@ -42,7 +42,7 @@ use_parallel_build yes build.args PYTHON=${prefix}/bin/python2.5 destroot.destdir \ - TARGET_DIR=${destroot}${prefix}/lib/erlang/lib/rabbitmq_server-${version} \ + TARGET_DIR=${destroot}${prefix}/lib/rabbitmq/lib/rabbitmq_server-${version} \ SBIN_DIR=${sbindir} \ MAN_DIR=${destroot}${prefix}/share/man @@ -61,9 +61,7 @@ post-destroot { xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir} reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \ - ${sbindir}/rabbitmq-multi \ - ${sbindir}/rabbitmq-server \ - ${sbindir}/rabbitmqctl + ${sbindir}/rabbitmq-env reinplace -E "s:(CLUSTER_CONFIG_FILE)=/:\\1=${prefix}/:" \ ${sbindir}/rabbitmq-multi \ ${sbindir}/rabbitmq-server \ @@ -83,14 +81,19 @@ post-destroot { xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \ ${wrappersbin}/rabbitmq-multi + xinstall -m 555 ${filespath}/rabbitmq-asroot-script-wrapper \ + ${wrappersbin}/rabbitmq-activate-plugins reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \ ${wrappersbin}/rabbitmq-multi reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \ ${wrappersbin}/rabbitmq-multi + reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \ + ${wrappersbin}/rabbitmq-activate-plugins + reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \ + ${wrappersbin}/rabbitmq-activate-plugins file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl - } pre-install { diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper new file mode 100644 index 00000000..c4488dcb --- /dev/null +++ b/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper @@ -0,0 +1,12 @@ +#!/bin/bash +cd /var/lib/rabbitmq + +SCRIPT=`basename $0` + +if [ `id -u` = 0 ] ; then + /usr/lib/rabbitmq/bin/${SCRIPT} "$@" +else + echo -e "\nOnly root should run ${SCRIPT}\n" + exit 1 +fi + diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile index 59101cb2..387becb3 100644 --- a/packaging/windows/Makefile +++ b/packaging/windows/Makefile @@ -4,15 +4,16 @@ TARGET_DIR=rabbitmq_server-$(VERSION) TARGET_ZIP=rabbitmq-server-windows-$(VERSION) dist: - make -C ../.. VERSION=$(VERSION) srcdist + $(MAKE) -C ../.. VERSION=$(VERSION) srcdist tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz - make -C $(SOURCE_DIR) + $(MAKE) -C $(SOURCE_DIR) mkdir $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmq-server.bat $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmq-service.bat $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmqctl.bat $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmq-multi.bat $(SOURCE_DIR)/sbin + mv $(SOURCE_DIR)/scripts/rabbitmq-activate-plugins.bat $(SOURCE_DIR)/sbin rm -rf $(SOURCE_DIR)/scripts rm -rf $(SOURCE_DIR)/codegen* $(SOURCE_DIR)/Makefile rm -f $(SOURCE_DIR)/README diff --git a/scripts/rabbitmq-activate-plugins b/scripts/rabbitmq-activate-plugins new file mode 100755 index 00000000..5ce64c68 --- /dev/null +++ b/scripts/rabbitmq-activate-plugins @@ -0,0 +1,47 @@ +#!/bin/sh +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License at +## http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +## License for the specific language governing rights and limitations +## under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developers of the Original Code are LShift Ltd, +## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +## Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Ltd. Portions created by Cohesive Financial Technologies LLC are +## Copyright (C) 2007-2009 Cohesive Financial Technologies +## LLC. Portions created by Rabbit Technologies Ltd are Copyright +## (C) 2007-2009 Rabbit Technologies Ltd. +## +## All Rights Reserved. +## +## Contributor(s): ______________________________________. +## + +. `dirname $0`/rabbitmq-env + +RABBITMQ_EBIN=${RABBITMQ_HOME}/ebin +[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins" +[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR="${RABBITMQ_HOME}/priv/plugins" + +exec erl \ + -pa "$RABBITMQ_EBIN" \ + -rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \ + -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \ + -rabbit rabbit_ebin "\"$RABBITMQ_EBIN\"" \ + -noinput \ + -hidden \ + -s rabbit_plugin_activator \ + -extra "$@" diff --git a/scripts/rabbitmq-activate-plugins.bat b/scripts/rabbitmq-activate-plugins.bat new file mode 100644 index 00000000..3540bf2d --- /dev/null +++ b/scripts/rabbitmq-activate-plugins.bat @@ -0,0 +1,49 @@ +@echo off
+REM The contents of this file are subject to the Mozilla Public License
+REM Version 1.1 (the "License"); you may not use this file except in
+REM compliance with the License. You may obtain a copy of the License at
+REM http://www.mozilla.org/MPL/
+REM
+REM Software distributed under the License is distributed on an "AS IS"
+REM basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+REM License for the specific language governing rights and limitations
+REM under the License.
+REM
+REM The Original Code is RabbitMQ.
+REM
+REM The Initial Developers of the Original Code are LShift Ltd,
+REM Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+REM
+REM Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+REM Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+REM are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+REM Technologies LLC, and Rabbit Technologies Ltd.
+REM
+REM Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+REM Ltd. Portions created by Cohesive Financial Technologies LLC are
+REM Copyright (C) 2007-2009 Cohesive Financial Technologies
+REM LLC. Portions created by Rabbit Technologies Ltd are Copyright
+REM (C) 2007-2009 Rabbit Technologies Ltd.
+REM
+REM All Rights Reserved.
+REM
+REM Contributor(s): ______________________________________.
+REM
+
+if not exist "%ERLANG_HOME%\bin\erl.exe" (
+ echo.
+ echo ******************************
+ echo ERLANG_HOME not set correctly.
+ echo ******************************
+ echo.
+ echo Please either set ERLANG_HOME to point to your Erlang installation or place the
+ echo RabbitMQ server distribution in the Erlang lib folder.
+ echo.
+ exit /B
+)
+
+set RABBITMQ_PLUGINS_DIR="%~dp0..\plugins"
+set RABBITMQ_PLUGINS_EXPAND_DIR="%~dp0..\priv\plugins"
+set RABBITMQ_EBIN_DIR="%~dp0..\ebin"
+
+"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden -s rabbit_plugin_activator -rabbit plugins_dir \"%RABBITMQ_PLUGINS_DIR:\=/%\" -rabbit plugins_expand_dir \"%RABBITMQ_PLUGINS_EXPAND_DIR:\=/%\" -rabbit rabbit_ebin \"%RABBITMQ_EBIN_DIR:\=/%\" -extra %*
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env new file mode 100755 index 00000000..69ddbcfe --- /dev/null +++ b/scripts/rabbitmq-env @@ -0,0 +1,53 @@ +#!/bin/sh +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License at +## http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +## License for the specific language governing rights and limitations +## under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developers of the Original Code are LShift Ltd, +## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +## Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Ltd. Portions created by Cohesive Financial Technologies LLC are +## Copyright (C) 2007-2009 Cohesive Financial Technologies +## LLC. Portions created by Rabbit Technologies Ltd are Copyright +## (C) 2007-2009 Rabbit Technologies Ltd. +## +## All Rights Reserved. +## +## Contributor(s): ______________________________________. +## + +# Determine where this script is really located +SCRIPT_PATH="$0" +while [ -h "$SCRIPT_PATH" ] ; do + FULL_PATH=`readlink -f $SCRIPT_PATH 2>/dev/null` + if [ "$?" != "0" ]; then + REL_PATH=`readlink $SCRIPT_PATH` + if expr "$REL_PATH" : '/.*' > /dev/null; then + SCRIPT_PATH="$REL_PATH" + else + SCRIPT_PATH="`dirname "$SCRIPT_PATH"`/$REL_PATH" + fi + else + SCRIPT_PATH=$FULL_PATH + fi +done + +SCRIPT_DIR=`dirname $SCRIPT_PATH` +RABBITMQ_HOME="${SCRIPT_DIR}/.." + +# Load configuration from the rabbitmq.conf file +[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 1d0c785f..7db4cb70 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -37,7 +37,7 @@ PIDS_FILE=/var/lib/rabbitmq/pids MULTI_ERL_ARGS= MULTI_START_ARGS= -[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf +. `dirname $0`/rabbitmq-env [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} @@ -60,7 +60,7 @@ export \ set -f exec erl \ - -pa "`dirname $0`/../ebin" \ + -pa "${RABBITMQ_HOME}/ebin" \ -noinput \ -hidden \ ${RABBITMQ_MULTI_ERL_ARGS} \ diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat index a30c0889..8abf13f1 100755 --- a/scripts/rabbitmq-multi.bat +++ b/scripts/rabbitmq-multi.bat @@ -49,10 +49,6 @@ if "%RABBITMQ_NODE_PORT%"=="" ( set RABBITMQ_PIDS_FILE=%RABBITMQ_BASE%\rabbitmq.pids
set RABBITMQ_SCRIPT_HOME=%~sdp0%
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 8502d60a..547220b4 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -41,7 +41,7 @@ LOG_BASE=/var/log/rabbitmq MNESIA_BASE=/var/lib/rabbitmq/mnesia SERVER_START_ARGS= -[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf +. `dirname $0`/rabbitmq-env [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} @@ -75,16 +75,25 @@ fi RABBITMQ_START_RABBIT= [ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT='-noinput -s rabbit' +RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin" +if [ -f "${RABBITMQ_EBIN_ROOT}/rabbit.boot" ]; then + RABBITMQ_BOOT_FILE="${RABBITMQ_EBIN_ROOT}/rabbit" + RABBITMQ_EBIN_PATH="" +else + RABBITMQ_BOOT_FILE=start_sasl + RABBITMQ_EBIN_PATH="-pa ${RABBITMQ_EBIN_ROOT}" +fi + # we need to turn off path expansion because some of the vars, notably # RABBITMQ_SERVER_ERL_ARGS, contain terms that look like globs and # there is no other way of preventing their expansion. set -f exec erl \ - -pa "`dirname $0`/../ebin" \ + ${RABBITMQ_EBIN_PATH} \ ${RABBITMQ_START_RABBIT} \ -sname ${RABBITMQ_NODENAME} \ - -boot start_sasl \ + -boot ${RABBITMQ_BOOT_FILE} \ +W w \ ${RABBITMQ_SERVER_ERL_ARGS} \ -rabbit tcp_listeners '[{"'${RABBITMQ_NODE_IP_ADDRESS}'", '${RABBITMQ_NODE_PORT}'}]' \ diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 9915727b..a784fee3 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -46,10 +46,6 @@ if "%RABBITMQ_NODE_PORT%"=="" ( set RABBITMQ_NODE_PORT=5672
)
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
@@ -84,10 +80,10 @@ set LOGS_BACKUP="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log%BACKUP_EXTENSION%" set SASL_LOGS_BAKCUP="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log%BACKUP_EXTENSION%"
if exist %LOGS% (
- type %LOGS% >> %LOGS_BACKUP%
+ type %LOGS% >> %LOGS_BACKUP%
)
if exist %SASL_LOGS% (
- type %SASL_LOGS% >> %SASL_LOGS_BAKCUP%
+ type %SASL_LOGS% >> %SASL_LOGS_BAKCUP%
)
rem End of log management
@@ -104,11 +100,20 @@ set CLUSTER_CONFIG=-rabbit cluster_config \""%RABBITMQ_CLUSTER_CONFIG_FILE:\=/%" if "%RABBITMQ_MNESIA_DIR%"=="" (
set RABBITMQ_MNESIA_DIR=%RABBITMQ_MNESIA_BASE%/%RABBITMQ_NODENAME%-mnesia
)
+set RABBITMQ_EBIN_ROOT=%~dp0..\ebin
+if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" (
+ echo Using Custom Boot File "%RABBITMQ_EBIN_ROOT%\rabbit.boot"
+ set RABBITMQ_BOOT_FILE="%RABBITMQ_EBIN_ROOT%\rabbit"
+ set RABBITMQ_EBIN_PATH=
+) else (
+ set RABBITMQ_BOOT_FILE=start_sasl
+ set RABBITMQ_EBIN_PATH=-pa "%RABBITMQ_EBIN_ROOT%"
+)
"%ERLANG_HOME%\bin\erl.exe" ^
--pa "%~dp0..\ebin" ^
+%RABBITMQ_EBIN_PATH% ^
-noinput ^
--boot start_sasl ^
+-boot %RABBITMQ_BOOT_FILE% ^
-sname %RABBITMQ_NODENAME% ^
-s rabbit ^
+W w ^
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index c57978c0..9c45e73d 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -30,12 +30,12 @@ ## Contributor(s): ______________________________________. ## -[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf +. `dirname $0`/rabbitmq-env [ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS} exec erl \ - -pa "`dirname $0`/../ebin" \ + -pa "${RABBITMQ_HOME}/ebin" \ -noinput \ -hidden \ ${RABBITMQ_CTL_ERL_ARGS} \ diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index e4dccfba..5111724f 100755 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -30,10 +30,6 @@ REM REM Contributor(s): ______________________________________.
REM
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index ba8becfc..36fb4fa8 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -1,4 +1,4 @@ -%% This file is a copy of gen_server.erl from the R11B-5 Erlang/OTP +%% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP %% distribution, with the following modifications: %% %% 1) the module name is gen_server2 @@ -21,6 +21,42 @@ %% higher priorities are processed before requests with lower %% priorities. The default priority is 0. %% +%% 5) The callback module can optionally implement +%% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be +%% called immediately prior to and post hibernation, respectively. If +%% handle_pre_hibernate returns {hibernate, NewState} then the process +%% will hibernate. If the module does not implement +%% handle_pre_hibernate/1 then the default action is to hibernate. +%% +%% 6) init can return a 4th arg, {backoff, InitialTimeout, +%% MinimumTimeout, DesiredHibernatePeriod} (all in +%% milliseconds). Then, on all callbacks which can return a timeout +%% (including init), timeout can be 'hibernate'. When this is the +%% case, the current timeout value will be used (initially, the +%% InitialTimeout supplied from init). After this timeout has +%% occurred, hibernation will occur as normal. Upon awaking, a new +%% current timeout value will be calculated. +%% +%% The purpose is that the gen_server2 takes care of adjusting the +%% current timeout value such that the process will increase the +%% timeout value repeatedly if it is unable to sleep for the +%% DesiredHibernatePeriod. If it is able to sleep for the +%% DesiredHibernatePeriod it will decrease the current timeout down to +%% the MinimumTimeout, so that the process is put to sleep sooner (and +%% hopefully stays asleep for longer). In short, should a process +%% using this receive a burst of messages, it should not hibernate +%% between those messages, but as the messages become less frequent, +%% the process will not only hibernate, it will do so sooner after +%% each message. +%% +%% When using this backoff mechanism, normal timeout values (i.e. not +%% 'hibernate') can still be used, and if they are used then the +%% handle_info(timeout, State) will be called as normal. In this case, +%% returning 'hibernate' from handle_info(timeout, State) will not +%% hibernate the process immediately, as it would if backoff wasn't +%% being used. Instead it'll wait for the current timeout as described +%% above. + %% All modifications are (C) 2009 LShift Ltd. %% ``The contents of this file are subject to the Erlang Public License, @@ -55,6 +91,7 @@ %%% init(Args) %%% ==> {ok, State} %%% {ok, State, Timeout} +%%% {ok, State, Timeout, Backoff} %%% ignore %%% {stop, Reason} %%% @@ -86,6 +123,17 @@ %%% %%% ==> ok %%% +%%% handle_pre_hibernate(State) +%%% +%%% ==> {hibernate, State} +%%% {stop, Reason, State} +%%% Reason = normal | shutdown | Term, terminate(State) is called +%%% +%%% handle_post_hibernate(State) +%%% +%%% ==> {noreply, State} +%%% {stop, Reason, State} +%%% Reason = normal | shutdown | Term, terminate(State) is called %%% %%% The work flow (of the server) can be described as follows: %%% @@ -116,7 +164,7 @@ cast/2, pcast/3, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, - enter_loop/3, enter_loop/4, enter_loop/5]). + enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]). -export([behaviour_info/1]). @@ -290,7 +338,7 @@ multi_call(Nodes, Name, Req, Timeout) %%----------------------------------------------------------------- -%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>) ->_ +%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_ %% %% Description: Makes an existing process into a gen_server. %% The calling process will enter the gen_server receive @@ -301,20 +349,30 @@ multi_call(Nodes, Name, Req, Timeout) %% process, including registering a name for it. %%----------------------------------------------------------------- enter_loop(Mod, Options, State) -> - enter_loop(Mod, Options, State, self(), infinity). + enter_loop(Mod, Options, State, self(), infinity, undefined). + +enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) -> + enter_loop(Mod, Options, State, self(), infinity, Backoff); enter_loop(Mod, Options, State, ServerName = {_, _}) -> - enter_loop(Mod, Options, State, ServerName, infinity); + enter_loop(Mod, Options, State, ServerName, infinity, undefined); enter_loop(Mod, Options, State, Timeout) -> - enter_loop(Mod, Options, State, self(), Timeout). + enter_loop(Mod, Options, State, self(), Timeout, undefined). + +enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) -> + enter_loop(Mod, Options, State, ServerName, infinity, Backoff); enter_loop(Mod, Options, State, ServerName, Timeout) -> + enter_loop(Mod, Options, State, ServerName, Timeout, undefined). + +enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) -> Name = get_proc_name(ServerName), Parent = get_parent(), Debug = debug_options(Name, Options), Queue = priority_queue:new(), - loop(Parent, Name, State, Mod, Timeout, Queue, Debug). + Backoff1 = extend_backoff(Backoff), + loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug). %%%======================================================================== %%% Gen-callback functions @@ -329,23 +387,37 @@ enter_loop(Mod, Options, State, ServerName, Timeout) -> %%% --------------------------------------------------- init_it(Starter, self, Name, Mod, Args, Options) -> init_it(Starter, self(), Name, Mod, Args, Options); -init_it(Starter, Parent, Name, Mod, Args, Options) -> +init_it(Starter, Parent, Name0, Mod, Args, Options) -> + Name = name(Name0), Debug = debug_options(Name, Options), Queue = priority_queue:new(), case catch Mod:init(Args) of {ok, State} -> proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, infinity, Queue, Debug); + loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug); {ok, State, Timeout} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, Timeout, Queue, Debug); + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug); + {ok, State, Timeout, Backoff = {backoff, _, _, _}} -> + Backoff1 = extend_backoff(Backoff), + proc_lib:init_ack(Starter, {ok, self()}), + loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug); {stop, Reason} -> + %% For consistency, we must make sure that the + %% registered name (if any) is unregistered before + %% the parent process is notified about the failure. + %% (Otherwise, the parent process could get + %% an 'already_started' error if it immediately + %% tried starting the process again.) + unregister_name(Name0), proc_lib:init_ack(Starter, {error, Reason}), exit(Reason); ignore -> + unregister_name(Name0), proc_lib:init_ack(Starter, ignore), exit(normal); {'EXIT', Reason} -> + unregister_name(Name0), proc_lib:init_ack(Starter, {error, Reason}), exit(Reason); Else -> @@ -354,33 +426,159 @@ init_it(Starter, Parent, Name, Mod, Args, Options) -> exit(Error) end. +name({local,Name}) -> Name; +name({global,Name}) -> Name; +%% name(Pid) when is_pid(Pid) -> Pid; +%% when R11 goes away, drop the line beneath and uncomment the line above +name(Name) -> Name. + +unregister_name({local,Name}) -> + _ = (catch unregister(Name)); +unregister_name({global,Name}) -> + _ = global:unregister_name(Name); +unregister_name(Pid) when is_pid(Pid) -> + Pid. + +extend_backoff(undefined) -> + undefined; +extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) -> + {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod, now()}. + %%%======================================================================== %%% Internal functions %%%======================================================================== %%% --------------------------------------------------- %%% The MAIN loop. %%% --------------------------------------------------- -loop(Parent, Name, State, Mod, Time, Queue, Debug) -> +loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) -> + pre_hibernate(Parent, Name, State, Mod, undefined, Queue, Debug); +loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> + process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, + drain(Queue), Debug). + +drain(Queue) -> receive - Input -> loop(Parent, Name, State, Mod, - Time, in(Input, Queue), Debug) - after 0 -> - case priority_queue:out(Queue) of - {{value, Msg}, Queue1} -> - process_msg(Parent, Name, State, Mod, - Time, Queue1, Debug, Msg); - {empty, Queue1} -> - receive - Input -> - loop(Parent, Name, State, Mod, - Time, in(Input, Queue1), Debug) - after Time -> - process_msg(Parent, Name, State, Mod, - Time, Queue1, Debug, timeout) + Input -> drain(in(Input, Queue)) + after 0 -> Queue + end. + +process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> + case priority_queue:out(Queue) of + {{value, Msg}, Queue1} -> + process_msg(Parent, Name, State, Mod, + Time, TimeoutState, Queue1, Debug, Msg); + {empty, Queue1} -> + {Time1, HibOnTimeout} + = case {Time, TimeoutState} of + {hibernate, {backoff, Current, _Min, _Desired, _RSt}} -> + {Current, true}; + {hibernate, _} -> + %% wake_hib/7 will set Time to hibernate. If + %% we were woken and didn't receive a msg + %% then we will get here and need a sensible + %% value for Time1, otherwise we crash. + %% R13B1 always waits infinitely when waking + %% from hibernation, so that's what we do + %% here too. + {infinity, false}; + _ -> {Time, false} + end, + receive + Input -> + %% Time could be 'hibernate' here, so *don't* call loop + process_next_msg( + Parent, Name, State, Mod, Time, TimeoutState, + drain(in(Input, Queue1)), Debug) + after Time1 -> + case HibOnTimeout of + true -> + pre_hibernate( + Parent, Name, State, Mod, TimeoutState, Queue1, + Debug); + false -> + process_msg( + Parent, Name, State, Mod, Time, TimeoutState, + Queue1, Debug, timeout) end end end. +wake_hib(Parent, Name, State, Mod, TS, Queue, Debug) -> + TimeoutState1 = case TS of + undefined -> + undefined; + {SleptAt, TimeoutState} -> + adjust_timeout_state(SleptAt, now(), TimeoutState) + end, + post_hibernate(Parent, Name, State, Mod, TimeoutState1, + drain(Queue), Debug). + +hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> + TS = case TimeoutState of + undefined -> undefined; + {backoff, _, _, _, _} -> {now(), TimeoutState} + end, + proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod, + TS, Queue, Debug]). + +pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> + case erlang:function_exported(Mod, handle_pre_hibernate, 1) of + true -> + case catch Mod:handle_pre_hibernate(State) of + {hibernate, NState} -> + hibernate(Parent, Name, NState, Mod, TimeoutState, Queue, + Debug); + Reply -> + handle_common_termination(Reply, Name, pre_hibernate, + Mod, State, Debug) + end; + false -> + hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) + end. + +post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> + case erlang:function_exported(Mod, handle_post_hibernate, 1) of + true -> + case catch Mod:handle_post_hibernate(State) of + {noreply, NState} -> + process_next_msg(Parent, Name, NState, Mod, infinity, + TimeoutState, Queue, Debug); + {noreply, NState, Time} -> + process_next_msg(Parent, Name, NState, Mod, Time, + TimeoutState, Queue, Debug); + Reply -> + handle_common_termination(Reply, Name, post_hibernate, + Mod, State, Debug) + end; + false -> + %% use hibernate here, not infinity. This matches + %% R13B. The key is that we should be able to get through + %% to process_msg calling sys:handle_system_msg with Time + %% still set to hibernate, iff that msg is the very msg + %% that woke us up (or the first msg we receive after + %% waking up). + process_next_msg(Parent, Name, State, Mod, hibernate, + TimeoutState, Queue, Debug) + end. + +adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, + DesiredHibPeriod, RandomState}) -> + NapLengthMicros = timer:now_diff(AwokeAt, SleptAt), + CurrentMicros = CurrentTO * 1000, + MinimumMicros = MinimumTO * 1000, + DesiredHibMicros = DesiredHibPeriod * 1000, + GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros, + Base = + %% If enough time has passed between the last two messages then we + %% should consider sleeping sooner. Otherwise stay awake longer. + case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of + true -> lists:max([MinimumTO, CurrentTO div 2]); + false -> CurrentTO + end, + {Extra, RandomState1} = random:uniform_s(Base, RandomState), + CurrentTO1 = Base + Extra, + {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}. + in({'$gen_pcast', {Priority, Msg}}, Queue) -> priority_queue:in({'$gen_cast', Msg}, Priority, Queue); in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> @@ -388,19 +586,25 @@ in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> in(Input, Queue) -> priority_queue:in(Input, Queue). -process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) -> +process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, + Debug, Msg) -> case Msg of {system, From, Req} -> - sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, - [Name, State, Mod, Time, Queue]); + sys:handle_system_msg + (Req, From, Parent, ?MODULE, Debug, + [Name, State, Mod, Time, TimeoutState, Queue]); + %% gen_server puts Hib on the end as the 7th arg, but that + %% version of the function seems not to be documented so + %% leaving out for now. {'EXIT', Parent, Reason} -> terminate(Reason, Name, Msg, Mod, State, Debug); _Msg when Debug =:= [] -> - handle_msg(Msg, Parent, Name, State, Mod, Time, Queue); + handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue); _Msg -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {in, Msg}), - handle_msg(Msg, Parent, Name, State, Mod, Time, Queue, Debug1) + handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue, + Debug1) end. %%% --------------------------------------------------- @@ -598,87 +802,95 @@ dispatch(Info, Mod, State) -> Mod:handle_info(Info, State). handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, _Time, Queue) -> + Parent, Name, State, Mod, TimeoutState, Queue) -> case catch Mod:handle_call(Msg, From, State) of {reply, Reply, NState} -> reply(From, Reply), - loop(Parent, Name, NState, Mod, infinity, Queue, []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); {reply, Reply, NState, Time1} -> reply(From, Reply), - loop(Parent, Name, NState, Mod, Time1, Queue, []); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, Queue, []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, Queue, []); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); {stop, Reason, Reply, NState} -> {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, NState, [])), reply(From, Reply), exit(R); - Other -> handle_common_reply(Other, - Parent, Name, Msg, Mod, State, Queue) + Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State, + TimeoutState, Queue) end; handle_msg(Msg, - Parent, Name, State, Mod, _Time, Queue) -> + Parent, Name, State, Mod, TimeoutState, Queue) -> Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue). + handle_common_reply(Reply, Parent, Name, Msg, Mod, State, + TimeoutState, Queue). handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, _Time, Queue, Debug) -> + Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> case catch Mod:handle_call(Msg, From, State) of {reply, Reply, NState} -> Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, + Debug1); {reply, Reply, NState, Time1} -> Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); {noreply, NState} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, + Debug1); {noreply, NState, Time1} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); {stop, Reason, Reply, NState} -> {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, NState, Debug)), reply(Name, From, Reply, NState, Debug), exit(R); Other -> - handle_common_reply(Other, - Parent, Name, Msg, Mod, State, Queue, Debug) + handle_common_reply(Other, Parent, Name, Msg, Mod, State, + TimeoutState, Queue, Debug) end; handle_msg(Msg, - Parent, Name, State, Mod, _Time, Queue, Debug) -> + Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, - Parent, Name, Msg, Mod, State, Queue, Debug). + handle_common_reply(Reply, Parent, Name, Msg, Mod, State, + TimeoutState, Queue, Debug). -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) -> +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, + TimeoutState, Queue) -> case Reply of {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, Queue, []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, Queue, []); - {stop, Reason, NState} -> - terminate(Reason, Name, Msg, Mod, NState, []); - {'EXIT', What} -> - terminate(What, Name, Msg, Mod, State, []); - _ -> - terminate({bad_return_value, Reply}, Name, Msg, Mod, State, []) + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); + _ -> + handle_common_termination(Reply, Name, Msg, Mod, State, []) end. -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) -> +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue, + Debug) -> case Reply of {noreply, NState} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, + Debug1); {noreply, NState, Time1} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); + _ -> + handle_common_termination(Reply, Name, Msg, Mod, State, Debug) + end. + +handle_common_termination(Reply, Name, Msg, Mod, State, Debug) -> + case Reply of {stop, Reason, NState} -> terminate(Reason, Name, Msg, Mod, NState, Debug); {'EXIT', What} -> @@ -696,16 +908,24 @@ reply(Name, {To, Tag}, Reply, State, Debug) -> %%----------------------------------------------------------------- %% Callback functions for system messages handling. %%----------------------------------------------------------------- -system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) -> - loop(Parent, Name, State, Mod, Time, Queue, Debug). +system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) -> + loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug). -system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) -> +-ifdef(use_specs). +-spec system_terminate(_, _, _, [_]) -> no_return(). +-endif. + +system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, + _TimeoutState, _Queue]) -> terminate(Reason, Name, [], Mod, State, Debug). -system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) -> +system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module, + OldVsn, Extra) -> case catch Mod:code_change(OldVsn, State, Extra) of - {ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]}; - Else -> Else + {ok, NewState} -> + {ok, [Name, NewState, Mod, Time, TimeoutState, Queue]}; + Else -> + Else end. %%----------------------------------------------------------------- @@ -747,6 +967,8 @@ terminate(Reason, Name, Msg, Mod, State, Debug) -> exit(normal); shutdown -> exit(shutdown); + {shutdown,_}=Shutdown -> + exit(Shutdown); _ -> error_info(Reason, Name, Msg, State, Debug), exit(Reason) @@ -871,8 +1093,8 @@ name_to_pid(Name) -> %% Status information %%----------------------------------------------------------------- format_status(Opt, StatusData) -> - [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, Queue]] = - StatusData, + [PDict, SysState, Parent, Debug, + [Name, State, Mod, _Time, _TimeoutState, Queue]] = StatusData, NameTag = if is_pid(Name) -> pid_to_list(Name); is_atom(Name) -> diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 732757c4..c74b39a9 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -55,7 +55,8 @@ -module(priority_queue). --export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]). +-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, + out/1, join/2]). %%---------------------------------------------------------------------------- @@ -73,6 +74,7 @@ -spec(in/2 :: (any(), pqueue()) -> pqueue()). -spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()). -spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}). +-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()). -endif. @@ -147,6 +149,42 @@ out({pqueue, [{P, Q} | Queues]}) -> end, {R, NewQ}. +join(A, {queue, [], []}) -> + A; +join({queue, [], []}, B) -> + B; +join({queue, AIn, AOut}, {queue, BIn, BOut}) -> + {queue, BIn, AOut ++ lists:reverse(AIn, BOut)}; +join(A = {queue, _, _}, {pqueue, BPQ}) -> + {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ), + Post1 = case Post of + [] -> [ {0, A} ]; + [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ]; + _ -> [ {0, A} | Post ] + end, + {pqueue, Pre ++ Post1}; +join({pqueue, APQ}, B = {queue, _, _}) -> + {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ), + Post1 = case Post of + [] -> [ {0, B} ]; + [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ]; + _ -> [ {0, B} | Post ] + end, + {pqueue, Pre ++ Post1}; +join({pqueue, APQ}, {pqueue, BPQ}) -> + {pqueue, merge(APQ, BPQ, [])}. + +merge([], BPQ, Acc) -> + lists:reverse(Acc, BPQ); +merge(APQ, [], Acc) -> + lists:reverse(Acc, APQ); +merge([{P, A}|As], [{P, B}|Bs], Acc) -> + merge(As, Bs, [ {P, join(A, B)} | Acc ]); +merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB -> + merge(As, Bs, [ {PA, A} | Acc ]); +merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) -> + merge(As, Bs, [ {PB, B} | Acc ]). + r2f([]) -> {queue, [], []}; r2f([_] = R) -> {queue, [], R}; r2f([X,Y]) -> {queue, [X], [Y]}; diff --git a/src/rabbit.erl b/src/rabbit.erl index 29e23b6d..6ad22e7a 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -133,6 +133,7 @@ start(normal, []) -> {"core processes", fun () -> ok = start_child(rabbit_log), + ok = rabbit_hooks:start(), ok = rabbit_amqqueue:start(), @@ -222,8 +223,21 @@ log_location(Type) -> print_banner() -> {ok, Product} = application:get_key(id), {ok, Version} = application:get_key(vsn), - io:format("~s ~s (AMQP ~p-~p)~n~s~n~s~n~n", - [Product, Version, + ProductLen = string:len(Product), + io:format("~n" + "+---+ +---+~n" + "| | | |~n" + "| | | |~n" + "| | | |~n" + "| +---+ +-------+~n" + "| |~n" + "| ~s +---+ |~n" + "| | | |~n" + "| ~s +---+ |~n" + "| |~n" + "+-------------------+~n" + "AMQP ~p-~p~n~s~n~s~n~n", + [Product, string:right([$v|Version], ProductLen), ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), Settings = [{"node", node()}, diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 21999f16..309c9a0e 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -41,7 +41,7 @@ -define(MEMSUP_CHECK_INTERVAL, 1000). %% OSes on which we know memory alarms to be trustworthy --define(SUPPORTED_OS, [{unix, linux}]). +-define(SUPPORTED_OS, [{unix, linux}, {unix, darwin}]). -record(alarms, {alertees, system_memory_high_watermark = false}). @@ -136,33 +136,35 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- start_memsup() -> - Mod = case os:type() of - %% memsup doesn't take account of buffers or cache when - %% considering "free" memory - therefore on Linux we can - %% get memory alarms very easily without any pressure - %% existing on memory at all. Therefore we need to use - %% our own simple memory monitor. - %% - {unix, linux} -> rabbit_memsup_linux; - - %% Start memsup programmatically rather than via the - %% rabbitmq-server script. This is not quite the right - %% thing to do as os_mon checks to see if memsup is - %% available before starting it, but as memsup is - %% available everywhere (even on VXWorks) it should be - %% ok. - %% - %% One benefit of the programmatic startup is that we - %% can add our alarm_handler before memsup is running, - %% thus ensuring that we notice memory alarms that go - %% off on startup. - %% - _ -> memsup - end, + {Mod, Args} = + case os:type() of + %% memsup doesn't take account of buffers or cache when + %% considering "free" memory - therefore on Linux we can + %% get memory alarms very easily without any pressure + %% existing on memory at all. Therefore we need to use + %% our own simple memory monitor. + %% + {unix, linux} -> {rabbit_memsup, [rabbit_memsup_linux]}; + {unix, darwin} -> {rabbit_memsup, [rabbit_memsup_darwin]}; + + %% Start memsup programmatically rather than via the + %% rabbitmq-server script. This is not quite the right + %% thing to do as os_mon checks to see if memsup is + %% available before starting it, but as memsup is + %% available everywhere (even on VXWorks) it should be + %% ok. + %% + %% One benefit of the programmatic startup is that we + %% can add our alarm_handler before memsup is running, + %% thus ensuring that we notice memory alarms that go + %% off on startup. + %% + _ -> {memsup, []} + end, %% This is based on os_mon:childspec(memsup, true) {ok, _} = supervisor:start_child( os_mon_sup, - {memsup, {Mod, start_link, []}, + {memsup, {Mod, start_link, Args}, permanent, 2000, worker, [Mod]}), ok. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 198e2782..f05f7880 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -51,8 +51,6 @@ -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). --define(CALL_TIMEOUT, 5000). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -305,10 +303,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - gen_server2:cast(QPid, {notify_sent, ChPid}). + gen_server2:pcast(QPid, 8, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server2:cast(QPid, {unblock, ChPid}). + gen_server2:pcast(QPid, 8, {unblock, ChPid}). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index cf0ef44f..fe2e8509 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -36,7 +36,8 @@ -behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). --define(HIBERNATE_AFTER, 1000). +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). -export([start_link/1]). @@ -101,7 +102,8 @@ init(Q) -> next_msg_id = 1, message_buffer = queue:new(), active_consumers = queue:new(), - blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}. + blocked_consumers = queue:new()}, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? @@ -116,9 +118,9 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}. +reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}. -noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. +noreply(NewState) -> {noreply, NewState, hibernate}. lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -813,11 +815,6 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_ch_down(DownPid, State); -handle_info(timeout, State) -> - %% TODO: Once we drop support for R11B-5, we can change this to - %% {noreply, State, hibernate}; - proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]); - handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 2dc619c1..4033aaaf 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -35,6 +35,7 @@ -export([publish/1, message/4, properties/1, delivery/4]). -export([publish/4, publish/7]). +-export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -53,6 +54,8 @@ -spec(publish/7 :: (exchange_name(), routing_key(), bool(), bool(), maybe(txn()), properties_input(), binary()) -> publish_result()). +-spec(build_content/2 :: (amqp_properties(), binary()) -> content()). +-spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}). -endif. @@ -72,16 +75,26 @@ delivery(Mandatory, Immediate, Txn, Message) -> #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, sender = self(), message = Message}. +build_content(Properties, BodyBin) -> + {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), + #content{class_id = ClassId, + properties = Properties, + properties_bin = none, + payload_fragments_rev = [BodyBin]}. + +from_content(Content) -> + #content{class_id = ClassId, + properties = Props, + payload_fragments_rev = FragmentsRev} = + rabbit_binary_parser:ensure_content_decoded(Content), + {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), + {Props, list_to_binary(lists:reverse(FragmentsRev))}. + message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> Properties = properties(RawProperties), - {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), - Content = #content{class_id = ClassId, - properties = Properties, - properties_bin = none, - payload_fragments_rev = [BodyBin]}, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKeyBin, - content = Content, + content = build_content(Properties, BodyBin), persistent_key = none}. properties(P = #'P_basic'{}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3089bb62..16b7c938 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -89,7 +89,7 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). conserve_memory(Pid, Conserve) -> - gen_server2:cast(Pid, {conserve_memory, Conserve}). + gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}). %%--------------------------------------------------------------------------- @@ -157,14 +157,16 @@ handle_cast({conserve_memory, Conserve}, State) -> State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), noreply(State). +handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, + State = #ch{writer_pid = WriterPid}) -> + State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, + {stop, normal, State}; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info(timeout, State) -> ok = clear_permission_cache(), - %% TODO: Once we drop support for R11B-5, we can change this to - %% {noreply, State, hibernate}; - proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]). + {noreply, State, hibernate}. terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, state = terminating}) -> diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 6649899a..37e4d189 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -36,7 +36,7 @@ -record(params, {quiet, node, command, args}). --define(RPC_TIMEOUT, 30000). +-define(RPC_TIMEOUT, infinity). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 2be00503..b789fbd1 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -42,6 +42,7 @@ terminate/2, code_change/3]). -define(SERVER, ?MODULE). +-define(SERIAL_FILENAME, "rabbit_serial"). -record(state, {serial}). @@ -59,17 +60,28 @@ %%---------------------------------------------------------------------------- start_link() -> - %% The persister can get heavily loaded, and we don't want that to - %% impact guid generation. We therefore keep the serial in a - %% separate process rather than calling rabbit_persister:serial/0 - %% directly in the functions below. gen_server:start_link({local, ?SERVER}, ?MODULE, - [rabbit_persister:serial()], []). + [update_disk_serial()], []). + +update_disk_serial() -> + Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME), + Serial = case rabbit_misc:read_term_file(Filename) of + {ok, [Num]} -> Num; + {error, enoent} -> rabbit_persister:serial(); + {error, Reason} -> + throw({error, {cannot_read_serial_file, Filename, Reason}}) + end, + case rabbit_misc:write_term_file(Filename, [Serial + 1]) of + ok -> ok; + {error, Reason1} -> + throw({error, {cannot_write_serial_file, Filename, Reason1}}) + end, + Serial. %% generate a guid that is monotonically increasing per process. %% %% The id is only unique within a single cluster and as long as the -%% persistent message store hasn't been deleted. +%% serial store hasn't been deleted. guid() -> %% We don't use erlang:now() here because a) it may return %% duplicates when the system clock has been rewound prior to a @@ -77,7 +89,7 @@ guid() -> %% now() to move ahead of the system time), and b) it is really %% slow since it takes a global lock and makes a system call. %% - %% rabbit_persister:serial/0, in combination with self/0 (which + %% A persisted serial number, in combination with self/0 (which %% includes the node name) uniquely identifies a process in space %% and time. We combine that with a process-local counter to give %% us a GUID that is monotonically increasing per process. diff --git a/src/rabbit_hooks.erl b/src/rabbit_hooks.erl new file mode 100644 index 00000000..b3d271c2 --- /dev/null +++ b/src/rabbit_hooks.erl @@ -0,0 +1,73 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_hooks). + +-export([start/0]). +-export([subscribe/3, unsubscribe/2, trigger/2, notify_remote/5]). + +-define(TableName, rabbit_hooks). + +-ifdef(use_specs). + +-spec(start/0 :: () -> 'ok'). +-spec(subscribe/3 :: (atom(), atom(), {atom(), atom(), list()}) -> 'ok'). +-spec(unsubscribe/2 :: (atom(), atom()) -> 'ok'). +-spec(trigger/2 :: (atom(), list()) -> 'ok'). +-spec(notify_remote/5 :: (atom(), atom(), list(), pid(), list()) -> 'ok'). + +-endif. + +start() -> + ets:new(?TableName, [bag, public, named_table]), + ok. + +subscribe(Hook, HandlerName, Handler) -> + ets:insert(?TableName, {Hook, HandlerName, Handler}), + ok. + +unsubscribe(Hook, HandlerName) -> + ets:match_delete(?TableName, {Hook, HandlerName, '_'}), + ok. + +trigger(Hook, Args) -> + Hooks = ets:lookup(?TableName, Hook), + [case catch apply(M, F, [Hook, Name, Args | A]) of + {'EXIT', Reason} -> + rabbit_log:warning("Failed to execute handler ~p for hook ~p: ~p", + [Name, Hook, Reason]); + _ -> ok + end || {_, Name, {M, F, A}} <- Hooks], + ok. + +notify_remote(Hook, HandlerName, Args, Pid, PidArgs) -> + Pid ! {rabbitmq_hook, [Hook, HandlerName, Args | PidArgs]}, + ok. diff --git a/src/rabbit_memsup.erl b/src/rabbit_memsup.erl new file mode 100644 index 00000000..b0d57cb2 --- /dev/null +++ b/src/rabbit_memsup.erl @@ -0,0 +1,142 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_memsup). + +-behaviour(gen_server). + +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([update/0]). + +-record(state, {memory_fraction, + timeout, + timer, + mod, + mod_state, + alarmed + }). + +-define(SERVER, memsup). %% must be the same as the standard memsup + +-define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (atom()) -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(update/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link(Args) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). + +update() -> + gen_server:cast(?SERVER, update). + +%%---------------------------------------------------------------------------- + +init([Mod]) -> + Fraction = os_mon:get_env(memsup, system_memory_high_watermark), + TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL), + InitState = Mod:init(), + State = #state { memory_fraction = Fraction, + timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, + timer = TRef, + mod = Mod, + mod_state = InitState, + alarmed = false }, + {ok, internal_update(State)}. + +start_timer(Timeout) -> + {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []), + TRef. + +%% Export the same API as the real memsup. Note that +%% get_sysmem_high_watermark gives an int in the range 0 - 100, while +%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0. +handle_call(get_sysmem_high_watermark, _From, State) -> + {reply, trunc(100 * State#state.memory_fraction), State}; + +handle_call({set_sysmem_high_watermark, Float}, _From, State) -> + {reply, ok, State#state{memory_fraction = Float}}; + +handle_call(get_check_interval, _From, State) -> + {reply, State#state.timeout, State}; + +handle_call({set_check_interval, Timeout}, _From, State) -> + {ok, cancel} = timer:cancel(State#state.timer), + {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; + +handle_call(get_memory_data, _From, + State = #state { mod = Mod, mod_state = ModState }) -> + {reply, Mod:get_memory_data(ModState), State}; + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(update, State) -> + {noreply, internal_update(State)}; + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +internal_update(State = #state { memory_fraction = MemoryFraction, + alarmed = Alarmed, + mod = Mod, mod_state = ModState }) -> + ModState1 = Mod:update(ModState), + {MemTotal, MemUsed, _BigProc} = Mod:get_memory_data(ModState1), + NewAlarmed = MemUsed / MemTotal > MemoryFraction, + case {Alarmed, NewAlarmed} of + {false, true} -> + alarm_handler:set_alarm({system_memory_high_watermark, []}); + {true, false} -> + alarm_handler:clear_alarm(system_memory_high_watermark); + _ -> + ok + end, + State #state { mod_state = ModState1, alarmed = NewAlarmed }. diff --git a/src/rabbit_memsup_darwin.erl b/src/rabbit_memsup_darwin.erl new file mode 100644 index 00000000..3de2d843 --- /dev/null +++ b/src/rabbit_memsup_darwin.erl @@ -0,0 +1,88 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_memsup_darwin). + +-export([init/0, update/1, get_memory_data/1]). + +-record(state, {total_memory, + allocated_memory}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()), + allocated_memory :: ('undefined' | non_neg_integer()) + }). + +-spec(init/0 :: () -> state()). +-spec(update/1 :: (state()) -> state()). +-spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(), + ('undefined' | pid())}). + +-endif. + +%%---------------------------------------------------------------------------- + +init() -> + #state{total_memory = undefined, + allocated_memory = undefined}. + +update(State) -> + File = os:cmd("/usr/bin/vm_stat"), + Lines = string:tokens(File, "\n"), + Dict = dict:from_list(lists:map(fun parse_line/1, Lines)), + [PageSize, Inactive, Active, Free, Wired] = + [dict:fetch(Key, Dict) || + Key <- [page_size, 'Pages inactive', 'Pages active', 'Pages free', + 'Pages wired down']], + MemTotal = PageSize * (Inactive + Active + Free + Wired), + MemUsed = PageSize * (Active + Wired), + State#state{total_memory = MemTotal, allocated_memory = MemUsed}. + +get_memory_data(State) -> + {State#state.total_memory, State#state.allocated_memory, undefined}. + +%%---------------------------------------------------------------------------- + +%% A line looks like "Foo bar: 123456." +parse_line(Line) -> + [Name, RHS | _Rest] = string:tokens(Line, ":"), + case Name of + "Mach Virtual Memory Statistics" -> + ["(page", "size", "of", PageSize, "bytes)"] = + string:tokens(RHS, " "), + {page_size, list_to_integer(PageSize)}; + _ -> + [Value | _Rest1] = string:tokens(RHS, " ."), + {list_to_atom(Name), list_to_integer(Value)} + end. diff --git a/src/rabbit_memsup_linux.erl b/src/rabbit_memsup_linux.erl index ffdc7e99..ca942d7c 100644 --- a/src/rabbit_memsup_linux.erl +++ b/src/rabbit_memsup_linux.erl @@ -31,104 +31,44 @@ -module(rabbit_memsup_linux). --behaviour(gen_server). +-export([init/0, update/1, get_memory_data/1]). --export([start_link/0]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --export([update/0]). - --define(SERVER, memsup). %% must be the same as the standard memsup - --define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000). - --record(state, {memory_fraction, alarmed, timeout, timer}). +-record(state, {total_memory, + allocated_memory}). %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). --spec(update/0 :: () -> 'ok'). - --endif. - -%%---------------------------------------------------------------------------- - -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()), + allocated_memory :: ('undefined' | non_neg_integer()) + }). +-spec(init/0 :: () -> state()). +-spec(update/1 :: (state()) -> state()). +-spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(), + ('undefined' | pid())}). -update() -> - gen_server:cast(?SERVER, update). +-endif. %%---------------------------------------------------------------------------- -init(_Args) -> - Fraction = os_mon:get_env(memsup, system_memory_high_watermark), - TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL), - {ok, #state{alarmed = false, - memory_fraction = Fraction, - timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, - timer = TRef}}. - -start_timer(Timeout) -> - {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []), - TRef. - -%% Export the same API as the real memsup. Note that -%% get_sysmem_high_watermark gives an int in the range 0 - 100, while -%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0. -handle_call(get_sysmem_high_watermark, _From, State) -> - {reply, trunc(100 * State#state.memory_fraction), State}; - -handle_call({set_sysmem_high_watermark, Float}, _From, State) -> - {reply, ok, State#state{memory_fraction = Float}}; +init() -> + #state{total_memory = undefined, + allocated_memory = undefined}. -handle_call(get_check_interval, _From, State) -> - {reply, State#state.timeout, State}; - -handle_call({set_check_interval, Timeout}, _From, State) -> - {ok, cancel} = timer:cancel(State#state.timer), - {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; - -handle_call(_Request, _From, State) -> - {noreply, State}. - -handle_cast(update, State = #state{alarmed = Alarmed, - memory_fraction = MemoryFraction}) -> +update(State) -> File = read_proc_file("/proc/meminfo"), Lines = string:tokens(File, "\n"), Dict = dict:from_list(lists:map(fun parse_line/1, Lines)), - MemTotal = dict:fetch('MemTotal', Dict), - MemUsed = MemTotal - - dict:fetch('MemFree', Dict) - - dict:fetch('Buffers', Dict) - - dict:fetch('Cached', Dict), - NewAlarmed = MemUsed / MemTotal > MemoryFraction, - case {Alarmed, NewAlarmed} of - {false, true} -> - alarm_handler:set_alarm({system_memory_high_watermark, []}); - {true, false} -> - alarm_handler:clear_alarm(system_memory_high_watermark); - _ -> - ok - end, - {noreply, State#state{alarmed = NewAlarmed}}; - -handle_cast(_Request, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. + [MemTotal, MemFree, Buffers, Cached] = + [dict:fetch(Key, Dict) || + Key <- ['MemTotal', 'MemFree', 'Buffers', 'Cached']], + MemUsed = MemTotal - MemFree - Buffers - Cached, + State#state{total_memory = MemTotal, allocated_memory = MemUsed}. + +get_memory_data(State) -> + {State#state.total_memory, State#state.allocated_memory, undefined}. %%---------------------------------------------------------------------------- @@ -152,5 +92,10 @@ read_proc_file(IoDevice, Acc) -> %% A line looks like "FooBar: 123456 kB" parse_line(Line) -> - [Name, Value | _] = string:tokens(Line, ": "), - {list_to_atom(Name), list_to_integer(Value)}. + [Name, RHS | _Rest] = string:tokens(Line, ":"), + [Value | UnitsRest] = string:tokens(RHS, " "), + Value1 = case UnitsRest of + [] -> list_to_integer(Value); %% no units + ["kB"] -> list_to_integer(Value) * 1024 + end, + {list_to_atom(Name), Value1}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 72e16f0f..95a274e3 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -41,6 +41,7 @@ -export([dirty_read/1]). -export([r/3, r/2, r_arg/4, rs/1]). -export([enable_cover/0, report_cover/0]). +-export([enable_cover/1, report_cover/1]). -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). @@ -49,9 +50,11 @@ -export([intersperse/2, upmap/2, map_in_order/2]). -export([table_foreach/2]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). +-export([read_term_file/1, write_term_file/2]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). +-export([unfold/2, ceil/1]). -import(mnesia). -import(lists). @@ -64,6 +67,8 @@ -include_lib("kernel/include/inet.hrl"). +-type(ok_or_error() :: 'ok' | {'error', any()}). + -spec(method_record_type/1 :: (tuple()) -> atom()). -spec(polite_pause/0 :: () -> 'done'). -spec(polite_pause/1 :: (non_neg_integer()) -> 'done'). @@ -87,8 +92,10 @@ -spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) -> undefined | r(K) when is_subtype(K, atom())). -spec(rs/1 :: (r(atom())) -> string()). --spec(enable_cover/0 :: () -> 'ok' | {'error', any()}). +-spec(enable_cover/0 :: () -> ok_or_error()). -spec(report_cover/0 :: () -> 'ok'). +-spec(enable_cover/1 :: (string()) -> ok_or_error()). +-spec(report_cover/1 :: (string()) -> 'ok'). -spec(throw_on_error/2 :: (atom(), thunk({error, any()} | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). @@ -97,7 +104,7 @@ -spec(with_vhost/2 :: (vhost(), thunk(A)) -> A). -spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A). -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). --spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok'). +-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok'). -spec(localnode/1 :: (atom()) -> erlang_node()). -spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()). -spec(intersperse/2 :: (A, [A]) -> [A]). @@ -107,12 +114,16 @@ -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). --spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}). --spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}). +-spec(dirty_dump_log/1 :: (string()) -> ok_or_error()). +-spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}). +-spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()). +-spec(append_file/2 :: (string(), string()) -> ok_or_error()). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). +-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). +-spec(ceil/1 :: (number()) -> number()). -endif. @@ -188,17 +199,27 @@ rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> [Kind, Name, VHostPath])). enable_cover() -> - case cover:compile_beam_directory("ebin") of + enable_cover("."). + +enable_cover([Root]) when is_atom(Root) -> + enable_cover(atom_to_list(Root)); +enable_cover(Root) -> + case cover:compile_beam_directory(filename:join(Root, "ebin")) of {error,Reason} -> {error,Reason}; _ -> ok end. report_cover() -> - Dir = "cover/", - ok = filelib:ensure_dir(Dir), + report_cover("."). + +report_cover([Root]) when is_atom(Root) -> + report_cover(atom_to_list(Root)); +report_cover(Root) -> + Dir = filename:join(Root, "cover"), + ok = filelib:ensure_dir(filename:join(Dir,"junk")), lists:foreach(fun(F) -> file:delete(F) end, - filelib:wildcard(Dir ++ "*.html")), - {ok, SummaryFile} = file:open(Dir ++ "summary.txt", [write]), + filelib:wildcard(filename:join(Dir, "*.html"))), + {ok, SummaryFile} = file:open(filename:join(Dir, "summary.txt"), [write]), {CT, NCT} = lists:foldl( fun(M,{CovTot, NotCovTot}) -> @@ -207,7 +228,7 @@ report_cover() -> Cov, NotCov, M), {ok,_} = cover:analyze_to_file( M, - Dir ++ atom_to_list(M) ++ ".html", + filename:join(Dir, atom_to_list(M) ++ ".html"), [html]), {CovTot+Cov, NotCovTot+NotCov} end, @@ -347,7 +368,9 @@ dirty_foreach_key1(F, TableName, K) -> end. dirty_dump_log(FileName) -> - {ok, LH} = disk_log:open([{name, dirty_dump_log}, {mode, read_only}, {file, FileName}]), + {ok, LH} = disk_log:open([{name, dirty_dump_log}, + {mode, read_only}, + {file, FileName}]), dirty_dump_log1(LH, disk_log:chunk(LH, start)), disk_log:close(LH). @@ -361,6 +384,12 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) -> dirty_dump_log1(LH, disk_log:chunk(LH, K)). +read_term_file(File) -> file:consult(File). + +write_term_file(File, Terms) -> + file:write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) || + Term <- Terms])). + append_file(File, Suffix) -> case file:read_file_info(File) of {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix); @@ -431,3 +460,18 @@ stop_applications(Apps) -> cannot_stop_application, Apps). +unfold(Fun, Init) -> + unfold(Fun, [], Init). + +unfold(Fun, Acc, Init) -> + case Fun(Init) of + {true, E, I} -> unfold(Fun, [E|Acc], I); + false -> {Acc, Init} + end. + +ceil(N) -> + T = trunc(N), + case N - T of + 0 -> N; + _ -> 1 + T + end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 575ecb0a..37e20335 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -149,6 +149,11 @@ table_definitions() -> table_names() -> [Tab || {Tab, _} <- table_definitions()]. +replicated_table_names() -> + [Tab || {Tab, Attrs} <- table_definitions(), + not lists:member({local_content, true}, Attrs) + ]. + dir() -> mnesia:system_info(directory). ensure_mnesia_dir() -> @@ -192,28 +197,16 @@ cluster_nodes_config_filename() -> create_cluster_nodes_config(ClusterNodes) -> FileName = cluster_nodes_config_filename(), - Handle = case file:open(FileName, [write]) of - {ok, Device} -> Device; - {error, Reason} -> - throw({error, {cannot_create_cluster_nodes_config, - FileName, Reason}}) - end, - try - ok = io:write(Handle, ClusterNodes), - ok = io:put_chars(Handle, [$.]) - after - case file:close(Handle) of - ok -> ok; - {error, Reason1} -> - throw({error, {cannot_close_cluster_nodes_config, - FileName, Reason1}}) - end - end, - ok. + case rabbit_misc:write_term_file(FileName, [ClusterNodes]) of + ok -> ok; + {error, Reason} -> + throw({error, {cannot_create_cluster_nodes_config, + FileName, Reason}}) + end. read_cluster_nodes_config() -> FileName = cluster_nodes_config_filename(), - case file:consult(FileName) of + case rabbit_misc:read_term_file(FileName) of {ok, [ClusterNodes]} -> ClusterNodes; {error, enoent} -> case application:get_env(cluster_config) of @@ -250,12 +243,10 @@ delete_cluster_nodes_config() -> %% standalone disk node, or disk or ram node connected to the %% specified cluster nodes. init_db(ClusterNodes) -> - WasDiskNode = mnesia:system_info(use_dir), - IsDiskNode = ClusterNodes == [] orelse - lists:member(node(), ClusterNodes), case mnesia:change_config(extra_db_nodes, ClusterNodes -- [node()]) of {ok, []} -> - if WasDiskNode and IsDiskNode -> + case mnesia:system_info(use_dir) of + true -> case check_schema_integrity() of ok -> ok; @@ -270,22 +261,18 @@ init_db(ClusterNodes) -> ok = move_db(), ok = create_schema() end; - WasDiskNode -> - throw({error, {cannot_convert_disk_node_to_ram_node, - ClusterNodes}}); - IsDiskNode -> - ok = create_schema(); - true -> - throw({error, {unable_to_contact_cluster_nodes, - ClusterNodes}}) + false -> + ok = create_schema() end; {ok, [_|_]} -> - ok = wait_for_tables(), - ok = create_local_table_copies( - case IsDiskNode of - true -> disc; - false -> ram - end); + IsDiskNode = ClusterNodes == [] orelse + lists:member(node(), ClusterNodes), + ok = wait_for_replicated_tables(), + ok = create_local_table_copy(schema, disc_copies), + ok = create_local_table_copies(case IsDiskNode of + true -> disc; + false -> ram + end); {error, Reason} -> %% one reason we may end up here is if we try to join %% nodes together that are currently running standalone or @@ -336,40 +323,36 @@ create_tables() -> table_definitions()), ok. +table_has_copy_type(TabDef, DiscType) -> + lists:member(node(), proplists:get_value(DiscType, TabDef, [])). + create_local_table_copies(Type) -> - ok = if Type /= ram -> create_local_table_copy(schema, disc_copies); - true -> ok - end, lists:foreach( fun({Tab, TabDef}) -> - HasDiscCopies = - lists:keymember(disc_copies, 1, TabDef), - HasDiscOnlyCopies = - lists:keymember(disc_only_copies, 1, TabDef), + HasDiscCopies = table_has_copy_type(TabDef, disc_copies), + HasDiscOnlyCopies = table_has_copy_type(TabDef, disc_only_copies), + LocalTab = proplists:get_bool(local_content, TabDef), StorageType = - case Type of - disc -> + if + Type =:= disc orelse LocalTab -> if - HasDiscCopies -> disc_copies; + HasDiscCopies -> disc_copies; HasDiscOnlyCopies -> disc_only_copies; - true -> ram_copies + true -> ram_copies end; %% unused code - commented out to keep dialyzer happy -%% disc_only -> +%% Type =:= disc_only -> %% if %% HasDiscCopies or HasDiscOnlyCopies -> %% disc_only_copies; %% true -> ram_copies %% end; - ram -> + Type =:= ram -> ram_copies end, ok = create_local_table_copy(Tab, StorageType) end, table_definitions()), - ok = if Type == ram -> create_local_table_copy(schema, ram_copies); - true -> ok - end, ok. create_local_table_copy(Tab, Type) -> @@ -384,10 +367,14 @@ create_local_table_copy(Tab, Type) -> end, ok. -wait_for_tables() -> +wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). + +wait_for_tables() -> wait_for_tables(table_names()). + +wait_for_tables(TableNames) -> case check_schema_integrity() of ok -> - case mnesia:wait_for_tables(table_names(), 30000) of + case mnesia:wait_for_tables(TableNames, 30000) of ok -> ok; {timeout, BadTabs} -> throw({error, {timeout_waiting_for_tables, BadTabs}}); diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl new file mode 100644 index 00000000..71278bfb --- /dev/null +++ b/src/rabbit_plugin_activator.erl @@ -0,0 +1,198 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_plugin_activator). + +-export([start/0, stop/0]). + +-define(DefaultPluginDir, "plugins"). +-define(DefaultUnpackedPluginDir, "priv/plugins"). +-define(DefaultRabbitEBin, "ebin"). +-define(BaseApps, [rabbit]). + +%%---------------------------------------------------------------------------- + +start() -> + %% Ensure Rabbit is loaded so we can access it's environment + application:load(rabbit), + + %% Determine our various directories + PluginDir = get_env(plugins_dir, ?DefaultPluginDir), + UnpackedPluginDir = get_env(plugins_expand_dir, ?DefaultUnpackedPluginDir), + RabbitEBin = get_env(rabbit_ebin, ?DefaultRabbitEBin), + + %% Unpack any .ez plugins + unpack_ez_plugins(PluginDir, UnpackedPluginDir), + + %% Build a list of required apps based on the fixed set, and any plugins + RequiredApps = ?BaseApps ++ + find_plugins(PluginDir) ++ + find_plugins(UnpackedPluginDir), + + %% Build the entire set of dependencies - this will load the + %% applications along the way + AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of + {unknown_app, {App, Err}} -> + io:format("ERROR: Failed to load application " ++ + "~s: ~p~n", [App, Err]), + halt(1); + AppList -> + AppList + end, + AppVersions = [determine_version(App) || App <- AllApps], + {value, {rabbit, RabbitVersion}} = lists:keysearch(rabbit, 1, AppVersions), + + %% Build the overall release descriptor + RDesc = {release, + {"rabbit", RabbitVersion}, + {erts, erlang:system_info(version)}, + AppVersions}, + + %% Write it out to ebin/rabbit.rel + file:write_file(RabbitEBin ++ "/rabbit.rel", + io_lib:format("~p.~n", [RDesc])), + + %% Compile the script + case systools:make_script(RabbitEBin ++ "/rabbit", [local, silent]) of + {ok, Module, Warnings} -> + %% This gets lots of spurious no-source warnings when we + %% have .ez files, so we want to supress them to prevent + %% hiding real issues. + WarningStr = Module:format_warning( + [W || W <- Warnings, + case W of + {warning, {source_not_found, _}} -> false; + _ -> true + end]), + case length(WarningStr) of + 0 -> ok; + _ -> io:format("~s", [WarningStr]) + end, + ok; + {error, Module, Error} -> + io:format("Boot file generation failed: ~s~n", + [Module:format_error(Error)]), + halt(1) + end, + halt(), + ok. + +stop() -> + ok. + +get_env(Key, Default) -> + case application:get_env(rabbit, Key) of + {ok, V} -> V; + _ -> Default + end. + +determine_version(App) -> + application:load(App), + {ok, Vsn} = application:get_key(App, vsn), + {App, Vsn}. + +assert_dir(Dir) -> + case filelib:is_dir(Dir) of + true -> ok; + false -> + ok = filelib:ensure_dir(Dir), + ok = file:make_dir(Dir) + end. +delete_dir(Dir) -> + case filelib:is_dir(Dir) of + true -> + case file:list_dir(Dir) of + {ok, Files} -> + [case Dir ++ "/" ++ F of + Fn -> + case filelib:is_dir(Fn) and not(is_symlink(Fn)) of + true -> delete_dir(Fn); + false -> file:delete(Fn) + end + end || F <- Files] + end, + ok = file:del_dir(Dir); + false -> + ok + end. +is_symlink(Name) -> + case file:read_link(Name) of + {ok, _} -> true; + _ -> false + end. + +unpack_ez_plugins(PluginSrcDir, PluginDestDir) -> + %% Eliminate the contents of the destination directory + delete_dir(PluginDestDir), + + assert_dir(PluginDestDir), + [unpack_ez_plugin(PluginName, PluginDestDir) || + PluginName <- filelib:wildcard(PluginSrcDir ++ "/*.ez")]. + +unpack_ez_plugin(PluginFn, PluginDestDir) -> + zip:unzip(PluginFn, [{cwd, PluginDestDir}]), + ok. + +find_plugins(PluginDir) -> + [prepare_dir_plugin(PluginName) || + PluginName <- filelib:wildcard(PluginDir ++ "/*/ebin/*.app")]. + +prepare_dir_plugin(PluginAppDescFn) -> + %% Add the plugin ebin directory to the load path + PluginEBinDirN = filename:dirname(PluginAppDescFn), + code:add_path(PluginEBinDirN), + + %% We want the second-last token + NameTokens = string:tokens(PluginAppDescFn,"/."), + PluginNameString = lists:nth(length(NameTokens) - 1, NameTokens), + list_to_atom(PluginNameString). + +expand_dependencies(Pending) -> + expand_dependencies(sets:new(), Pending). +expand_dependencies(Current, []) -> + Current; +expand_dependencies(Current, [Next|Rest]) -> + case sets:is_element(Next, Current) of + true -> + expand_dependencies(Current, Rest); + false -> + case application:load(Next) of + ok -> + ok; + {error, {already_loaded, _}} -> + ok; + X -> + throw({unknown_app, {Next, X}}) + end, + {ok, Required} = application:get_key(Next, applications), + Unique = [A || A <- Required, not(sets:is_element(A, Current))], + expand_dependencies(sets:add_element(Next, Current), Rest ++ Unique) + end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index a09783be..69dbc008 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -286,7 +286,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> %% since this termination is initiated by our parent it is %% probably more important to exit quickly. exit(Reason); - {'EXIT', _Pid, E = {writer, send_failed, _Error}} -> + {channel_exit, _Chan, E = {writer, send_failed, _Error}} -> throw(E); {channel_exit, Channel, Reason} -> mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State)); diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 01757509..b4cd30bc 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -33,6 +33,9 @@ -export([all_tests/0, test_parsing/0]). +%% Exported so the hook mechanism can call back +-export([handle_hook/3, bad_handle_hook/3, extra_arg_hook/5]). + -import(lists). -include("rabbit.hrl"). @@ -46,6 +49,7 @@ test_content_prop_roundtrip(Datum, Binary) -> all_tests() -> passed = test_priority_queue(), + passed = test_unfold(), passed = test_parsing(), passed = test_topic_matching(), passed = test_log_management(), @@ -54,6 +58,7 @@ all_tests() -> passed = test_cluster_management(), passed = test_user_management(), passed = test_server_status(), + passed = test_hooks(), passed. test_priority_queue() -> @@ -71,7 +76,8 @@ test_priority_queue() -> %% 1-element priority Q Q1 = priority_queue:in(foo, 1, priority_queue:new()), - {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1), + {true, false, 1, [{1, foo}], [foo]} = + test_priority_queue(Q1), %% 2-element same-priority Q Q2 = priority_queue:in(bar, 1, Q1), @@ -87,6 +93,71 @@ test_priority_queue() -> Q4 = priority_queue:in(foo, -1, priority_queue:new()), {true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4), + %% merge 2 * 1-element no-priority Qs + Q5 = priority_queue:join(priority_queue:in(foo, Q), + priority_queue:in(bar, Q)), + {true, false, 2, [{0, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q5), + + %% merge 1-element no-priority Q with 1-element priority Q + Q6 = priority_queue:join(priority_queue:in(foo, Q), + priority_queue:in(bar, 1, Q)), + {true, false, 2, [{1, bar}, {0, foo}], [bar, foo]} = + test_priority_queue(Q6), + + %% merge 1-element priority Q with 1-element no-priority Q + Q7 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, Q)), + {true, false, 2, [{1, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q7), + + %% merge 2 * 1-element same-priority Qs + Q8 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, 1, Q)), + {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} = + test_priority_queue(Q8), + + %% merge 2 * 1-element different-priority Qs + Q9 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, 2, Q)), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = + test_priority_queue(Q9), + + %% merge 2 * 1-element different-priority Qs (other way around) + Q10 = priority_queue:join(priority_queue:in(bar, 2, Q), + priority_queue:in(foo, 1, Q)), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = + test_priority_queue(Q10), + + %% merge 2 * 2-element multi-different-priority Qs + Q11 = priority_queue:join(Q6, Q5), + {true, false, 4, [{1, bar}, {0, foo}, {0, foo}, {0, bar}], + [bar, foo, foo, bar]} = test_priority_queue(Q11), + + %% and the other way around + Q12 = priority_queue:join(Q5, Q6), + {true, false, 4, [{1, bar}, {0, foo}, {0, bar}, {0, foo}], + [bar, foo, bar, foo]} = test_priority_queue(Q12), + + %% merge with negative priorities + Q13 = priority_queue:join(Q4, Q5), + {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} = + test_priority_queue(Q13), + + %% and the other way around + Q14 = priority_queue:join(Q5, Q4), + {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} = + test_priority_queue(Q14), + + %% joins with empty queues: + Q1 = priority_queue:join(Q, Q1), + Q1 = priority_queue:join(Q1, Q), + + %% insert with priority into non-empty zero-priority queue + Q15 = priority_queue:in(baz, 1, Q5), + {true, false, 3, [{1, baz}, {0, foo}, {0, bar}], [baz, foo, bar]} = + test_priority_queue(Q15), + passed. priority_queue_in_all(Q, L) -> @@ -112,6 +183,14 @@ test_simple_n_element_queue(N) -> {true, false, N, ToListRes, Items} = test_priority_queue(Q), passed. +test_unfold() -> + {[], test} = rabbit_misc:unfold(fun (_V) -> false end, test), + List = lists:seq(2,20,2), + {List, 0} = rabbit_misc:unfold(fun (0) -> false; + (N) -> {true, N*2, N-1} + end, 10), + passed. + test_parsing() -> passed = test_content_properties(), passed. @@ -404,19 +483,17 @@ test_cluster_management() -> end, ClusteringSequence), - %% attempt to convert a disk node into a ram node + %% convert a disk node into a ram node ok = control_action(reset, []), ok = control_action(start_app, []), ok = control_action(stop_app, []), - {error, {cannot_convert_disk_node_to_ram_node, _}} = - control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), - %% attempt to join a non-existing cluster as a ram node + %% join a non-existing cluster as a ram node ok = control_action(reset, []), - {error, {unable_to_contact_cluster_nodes, _}} = - control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), SecondaryNode = rabbit_misc:localnode(hare), case net_adm:ping(SecondaryNode) of @@ -432,11 +509,12 @@ test_cluster_management2(SecondaryNode) -> NodeS = atom_to_list(node()), SecondaryNodeS = atom_to_list(SecondaryNode), - %% attempt to convert a disk node into a ram node + %% make a disk node ok = control_action(reset, []), ok = control_action(cluster, [NodeS]), - {error, {unable_to_join_cluster, _, _}} = - control_action(cluster, [SecondaryNodeS]), + %% make a ram node + ok = control_action(reset, []), + ok = control_action(cluster, [SecondaryNodeS]), %% join cluster as a ram node ok = control_action(reset, []), @@ -449,21 +527,21 @@ test_cluster_management2(SecondaryNode) -> ok = control_action(start_app, []), ok = control_action(stop_app, []), - %% attempt to join non-existing cluster as a ram node - {error, _} = control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), - + %% join non-existing cluster as a ram node + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), %% turn ram node into disk node + ok = control_action(reset, []), ok = control_action(cluster, [SecondaryNodeS, NodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), - %% attempt to convert a disk node into a ram node - {error, {cannot_convert_disk_node_to_ram_node, _}} = - control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + %% convert a disk node into a ram node + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), %% turn a disk node into a ram node + ok = control_action(reset, []), ok = control_action(cluster, [SecondaryNodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), @@ -601,6 +679,52 @@ test_server_status() -> passed. +test_hooks() -> + %% Firing of hooks calls all hooks in an isolated manner + rabbit_hooks:subscribe(test_hook, test, {rabbit_tests, handle_hook, []}), + rabbit_hooks:subscribe(test_hook, test2, {rabbit_tests, handle_hook, []}), + rabbit_hooks:subscribe(test_hook2, test2, {rabbit_tests, handle_hook, []}), + rabbit_hooks:trigger(test_hook, [arg1, arg2]), + [arg1, arg2] = get(test_hook_test_fired), + [arg1, arg2] = get(test_hook_test2_fired), + undefined = get(test_hook2_test2_fired), + + %% Hook Deletion works + put(test_hook_test_fired, undefined), + put(test_hook_test2_fired, undefined), + rabbit_hooks:unsubscribe(test_hook, test), + rabbit_hooks:trigger(test_hook, [arg3, arg4]), + undefined = get(test_hook_test_fired), + [arg3, arg4] = get(test_hook_test2_fired), + undefined = get(test_hook2_test2_fired), + + %% Catches exceptions from bad hooks + rabbit_hooks:subscribe(test_hook3, test, {rabbit_tests, bad_handle_hook, []}), + ok = rabbit_hooks:trigger(test_hook3, []), + + %% Passing extra arguments to hooks + rabbit_hooks:subscribe(arg_hook, test, {rabbit_tests, extra_arg_hook, [1, 3]}), + rabbit_hooks:trigger(arg_hook, [arg1, arg2]), + {[arg1, arg2], 1, 3} = get(arg_hook_test_fired), + + %% Invoking Pids + Remote = fun() -> + receive + {rabbitmq_hook,[remote_test,test,[],Target]} -> + Target ! invoked + end + end, + P = spawn(Remote), + rabbit_hooks:subscribe(remote_test, test, {rabbit_hooks, notify_remote, [P, [self()]]}), + rabbit_hooks:trigger(remote_test, []), + receive + invoked -> ok + after 100 -> + io:format("Remote hook not invoked"), + throw(timeout) + end, + passed. + %--------------------------------------------------------------------- control_action(Command, Args) -> control_action(Command, node(), Args). @@ -684,3 +808,11 @@ delete_log_handlers(Handlers) -> [[] = error_logger:delete_report_handler(Handler) || Handler <- Handlers], ok. + +handle_hook(HookName, Handler, Args) -> + A = atom_to_list(HookName) ++ "_" ++ atom_to_list(Handler) ++ "_fired", + put(list_to_atom(A), Args). +bad_handle_hook(_, _, _) -> + bad:bad(). +extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) -> + handle_hook(Hookname, Handler, {Args, Extra1, Extra2}). diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 5ca294b7..1679ce7c 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -33,9 +33,9 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/3, shutdown/1, mainloop/1]). --export([send_command/2, send_command/3, - send_command_and_notify/5]). +-export([start/3, start_link/3, shutdown/1, mainloop/1]). +-export([send_command/2, send_command/3, send_command_and_signal_back/3, + send_command_and_signal_back/4, send_command_and_notify/5]). -export([internal_send_command/3, internal_send_command/5]). -import(gen_tcp). @@ -49,8 +49,12 @@ -ifdef(use_specs). -spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). +-spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(send_command/3 :: (pid(), amqp_method(), content()) -> 'ok'). +-spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok'). +-spec(send_command_and_signal_back/4 :: + (pid(), amqp_method(), content(), pid()) -> 'ok'). -spec(send_command_and_notify/5 :: (pid(), pid(), pid(), amqp_method(), content()) -> 'ok'). -spec(internal_send_command/3 :: @@ -68,6 +72,11 @@ start(Sock, Channel, FrameMax) -> channel = Channel, frame_max = FrameMax}]). +start_link(Sock, Channel, FrameMax) -> + spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax}]). + mainloop(State) -> receive Message -> ?MODULE:mainloop(handle_message(Message, State)) @@ -86,6 +95,19 @@ handle_message({send_command, MethodRecord, Content}, ok = internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax), State; +handle_message({send_command_and_signal_back, MethodRecord, Parent}, + State = #wstate{sock = Sock, channel = Channel}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord), + Parent ! rabbit_writer_send_command_signal, + State; +handle_message({send_command_and_signal_back, MethodRecord, Content, Parent}, + State = #wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord, + Content, FrameMax), + Parent ! rabbit_writer_send_command_signal, + State; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, State = #wstate{sock = Sock, channel = Channel, @@ -113,6 +135,14 @@ send_command(W, MethodRecord, Content) -> W ! {send_command, MethodRecord, Content}, ok. +send_command_and_signal_back(W, MethodRecord, Parent) -> + W ! {send_command_and_signal_back, MethodRecord, Parent}, + ok. + +send_command_and_signal_back(W, MethodRecord, Content, Parent) -> + W ! {send_command_and_signal_back, MethodRecord, Content, Parent}, + ok. + send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. |