summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-25 14:00:50 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-25 14:00:50 +0100
commit431942e4d954e090370f900653e730eb4005b141 (patch)
tree841aba5d1389a816f3beb412db8019742bf8204a
parent1b8a5d0be0482e66d9652fc231a8be09df2b0add (diff)
parent4322cc73ba7cbc22f1299482a9352cff77cbf704 (diff)
downloadrabbitmq-server-431942e4d954e090370f900653e730eb4005b141.tar.gz
merging in from default
-rw-r--r--.hgignore5
-rw-r--r--Makefile37
-rwxr-xr-xcalculate-relative45
-rw-r--r--docs/rabbitmq-activate-plugins.1.pod35
-rw-r--r--docs/rabbitmq-multi.1.pod2
-rw-r--r--docs/rabbitmq-server.1.pod24
-rw-r--r--docs/rabbitmq.conf.5.pod6
-rw-r--r--docs/rabbitmqctl.1.pod103
-rw-r--r--ebin/rabbit.rel7
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--packaging/RPMS/Fedora/Makefile14
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec13
-rw-r--r--packaging/common/rabbitmq-asroot-script-wrapper53
-rw-r--r--packaging/common/rabbitmq-script-wrapper31
-rw-r--r--packaging/common/rabbitmq-server.init (renamed from packaging/RPMS/Fedora/init.d)33
-rw-r--r--packaging/debs/Debian/Makefile9
-rw-r--r--packaging/debs/Debian/debian/init.d122
-rw-r--r--packaging/debs/Debian/debian/rules5
-rw-r--r--packaging/generic-unix/Makefile4
-rw-r--r--packaging/macports/net/rabbitmq-server/Portfile13
-rw-r--r--packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper12
-rw-r--r--packaging/windows/Makefile5
-rwxr-xr-xscripts/rabbitmq-activate-plugins47
-rw-r--r--scripts/rabbitmq-activate-plugins.bat49
-rwxr-xr-xscripts/rabbitmq-env53
-rwxr-xr-xscripts/rabbitmq-multi4
-rwxr-xr-xscripts/rabbitmq-multi.bat4
-rwxr-xr-xscripts/rabbitmq-server15
-rwxr-xr-xscripts/rabbitmq-server.bat21
-rwxr-xr-xscripts/rabbitmqctl4
-rwxr-xr-xscripts/rabbitmqctl.bat4
-rw-r--r--src/gen_server2.erl364
-rw-r--r--src/priority_queue.erl40
-rw-r--r--src/rabbit.erl18
-rw-r--r--src/rabbit_alarm.erl52
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl15
-rw-r--r--src/rabbit_basic.erl25
-rw-r--r--src/rabbit_channel.erl10
-rw-r--r--src/rabbit_control.erl2
-rw-r--r--src/rabbit_guid.erl26
-rw-r--r--src/rabbit_hooks.erl73
-rw-r--r--src/rabbit_memsup.erl142
-rw-r--r--src/rabbit_memsup_darwin.erl88
-rw-r--r--src/rabbit_memsup_linux.erl115
-rw-r--r--src/rabbit_misc.erl66
-rw-r--r--src/rabbit_mnesia.erl97
-rw-r--r--src/rabbit_plugin_activator.erl198
-rw-r--r--src/rabbit_reader.erl2
-rw-r--r--src/rabbit_tests.erl172
-rw-r--r--src/rabbit_writer.erl36
51 files changed, 1744 insertions, 584 deletions
diff --git a/.hgignore b/.hgignore
index 8911cac7..839f1601 100644
--- a/.hgignore
+++ b/.hgignore
@@ -12,6 +12,11 @@ syntax: regexp
^src/rabbit_framing.erl$
^rabbit.plt$
^ebin/rabbit.app$
+^ebin/rabbit.rel$
+^ebin/rabbit.boot$
+^ebin/rabbit.script$
+^plugins/
+^priv/plugins/
^packaging/RPMS/Fedora/(BUILD|RPMS|SOURCES|SPECS|SRPMS)$
^packaging/debs/Debian/rabbitmq-server_.*\.(dsc|(diff|tar)\.gz|deb|changes)$
diff --git a/Makefile b/Makefile
index cde58c3b..f0702756 100644
--- a/Makefile
+++ b/Makefile
@@ -20,10 +20,10 @@ PYTHON=python
ifndef USE_SPECS
# our type specs rely on features / bug fixes in dialyzer that are
-# only available in R12B-5 upwards
+# only available in R13B upwards (R13B is eshell 5.7.1)
#
# NB: the test assumes that version number will only contain single digits
-USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.6.4" ]; then echo "true"; else echo "false"; fi)
+USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.0" ]; then echo "true"; else echo "false"; fi)
endif
#other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests
@@ -39,9 +39,6 @@ AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
-# for the moment we don't use boot files because they introduce a
-# dependency on particular versions of OTP applications
-#all: $(EBIN_DIR)/rabbit.boot
all: $(TARGETS)
$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
@@ -101,8 +98,8 @@ run-tests: all
start-background-node:
$(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \
RABBITMQ_NODE_ONLY=true \
- RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS)" \
- ./scripts/rabbitmq-server -detached; sleep 1
+ RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \
+ ./scripts/rabbitmq-server ; sleep 1
start-rabbit-on-node: all
echo "rabbit:start()." | $(ERL_CALL)
@@ -116,8 +113,11 @@ force-snapshot: all
stop-node:
-$(ERL_CALL) -q
+# code coverage will be created for subdirectory "ebin" of COVER_DIR
+COVER_DIR=.
+
start-cover: all
- echo "cover:start(), rabbit_misc:enable_cover()." | $(ERL_CALL)
+ echo "cover:start(), rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL)
stop-cover: all
echo "rabbit_misc:report_cover(), cover:stop()." | $(ERL_CALL)
@@ -134,10 +134,10 @@ srcdist: distclean
cp README.in $(TARGET_SRC_DIR)/README
elinks -dump -no-references -no-numbering $(WEB_URL)build-server.html \
>> $(TARGET_SRC_DIR)/BUILD
- sed -i.save 's/%%VERSION%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save
+ sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save
cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/
- cp codegen.py Makefile generate_app $(TARGET_SRC_DIR)
+ cp codegen.py Makefile generate_app calculate-relative $(TARGET_SRC_DIR)
cp -r scripts $(TARGET_SRC_DIR)
cp -r docs $(TARGET_SRC_DIR)
@@ -148,7 +148,7 @@ srcdist: distclean
rm -rf $(TARGET_SRC_DIR)
distclean: clean
- make -C $(AMQP_CODEGEN_DIR) distclean
+ $(MAKE) -C $(AMQP_CODEGEN_DIR) distclean
rm -rf dist
find . -regex '.*\(~\|#\|\.swp\|\.dump\)' -exec rm {} \;
@@ -163,7 +163,8 @@ distclean: clean
docs_all: $(MANPAGES)
-install: all docs_all
+install: SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR))
+install: all docs_all install_dirs
@[ -n "$(TARGET_DIR)" ] || (echo "Please set TARGET_DIR."; false)
@[ -n "$(SBIN_DIR)" ] || (echo "Please set SBIN_DIR."; false)
@[ -n "$(MAN_DIR)" ] || (echo "Please set MAN_DIR."; false)
@@ -172,13 +173,17 @@ install: all docs_all
cp -r ebin include LICENSE LICENSE-MPL-RabbitMQ INSTALL $(TARGET_DIR)
chmod 0755 scripts/*
- mkdir -p $(SBIN_DIR)
- cp scripts/rabbitmq-server $(SBIN_DIR)
- cp scripts/rabbitmqctl $(SBIN_DIR)
- cp scripts/rabbitmq-multi $(SBIN_DIR)
+ for script in rabbitmq-env rabbitmq-server rabbitmqctl rabbitmq-multi rabbitmq-activate-plugins; do \
+ cp scripts/$$script $(TARGET_DIR)/sbin; \
+ [ -e $(SBIN_DIR)/$$script ] || ln -s $(SCRIPTS_REL_PATH)/$$script $(SBIN_DIR)/$$script; \
+ done
for section in 1 5; do \
mkdir -p $(MAN_DIR)/man$$section; \
for manpage in docs/*.$$section.pod; do \
cp docs/`basename $$manpage .pod`.gz $(MAN_DIR)/man$$section; \
done; \
done
+
+install_dirs:
+ mkdir -p $(SBIN_DIR)
+ mkdir -p $(TARGET_DIR)/sbin
diff --git a/calculate-relative b/calculate-relative
new file mode 100755
index 00000000..3af18e8f
--- /dev/null
+++ b/calculate-relative
@@ -0,0 +1,45 @@
+#!/usr/bin/env python
+#
+# relpath.py
+# R.Barran 30/08/2004
+# Retrieved from http://code.activestate.com/recipes/302594/
+
+import os
+import sys
+
+def relpath(target, base=os.curdir):
+ """
+ Return a relative path to the target from either the current dir or an optional base dir.
+ Base can be a directory specified either as absolute or relative to current dir.
+ """
+
+ if not os.path.exists(target):
+ raise OSError, 'Target does not exist: '+target
+
+ if not os.path.isdir(base):
+ raise OSError, 'Base is not a directory or does not exist: '+base
+
+ base_list = (os.path.abspath(base)).split(os.sep)
+ target_list = (os.path.abspath(target)).split(os.sep)
+
+ # On the windows platform the target may be on a completely different drive from the base.
+ if os.name in ['nt','dos','os2'] and base_list[0] <> target_list[0]:
+ raise OSError, 'Target is on a different drive to base. Target: '+target_list[0].upper()+', base: '+base_list[0].upper()
+
+ # Starting from the filepath root, work out how much of the filepath is
+ # shared by base and target.
+ for i in range(min(len(base_list), len(target_list))):
+ if base_list[i] <> target_list[i]: break
+ else:
+ # If we broke out of the loop, i is pointing to the first differing path elements.
+ # If we didn't break out of the loop, i is pointing to identical path elements.
+ # Increment i so that in all cases it points to the first differing path elements.
+ i+=1
+
+ rel_list = [os.pardir] * (len(base_list)-i) + target_list[i:]
+ if (len(rel_list) == 0):
+ return "."
+ return os.path.join(*rel_list)
+
+if __name__ == "__main__":
+ print(relpath(sys.argv[1], sys.argv[2]))
diff --git a/docs/rabbitmq-activate-plugins.1.pod b/docs/rabbitmq-activate-plugins.1.pod
new file mode 100644
index 00000000..6bf9f6c4
--- /dev/null
+++ b/docs/rabbitmq-activate-plugins.1.pod
@@ -0,0 +1,35 @@
+=head1 NAME
+
+rabbitmq-activate-plugins - command line tool for activating plugins in a RabbitMQ broker
+
+=head1 SYNOPSIS
+
+rabbitmq-activate-plugins
+
+=head1 DESCRIPTION
+
+RabbitMQ is an implementation of AMQP, the emerging standard for high
+performance enterprise messaging. The RabbitMQ server is a robust and
+scalable implementation of an AMQP broker.
+
+rabbitmq-activate-plugins is a command line tool for activating plugins installed
+into the broker's plugins directory.
+
+=head1 EXAMPLES
+
+To activate all of the installed plugins in the current RabbitMQ install,
+execute:
+
+ rabbitmq-activate-plugins
+
+=head1 SEE ALSO
+
+rabbitmq.conf(5), rabbitmq-multi(1), rabbitmq-server(1), rabbitmqctl(1)
+
+=head1 AUTHOR
+
+The RabbitMQ Team <info@rabbitmq.com>
+
+=head1 REFERENCES
+
+RabbitMQ Web Site: http://www.rabbitmq.com
diff --git a/docs/rabbitmq-multi.1.pod b/docs/rabbitmq-multi.1.pod
index 23fd96ed..63848756 100644
--- a/docs/rabbitmq-multi.1.pod
+++ b/docs/rabbitmq-multi.1.pod
@@ -21,7 +21,7 @@ See also rabbitmq-server(1) for configuration information.
start_all I<count>
start count nodes with unique names, listening on all IP addresses
- and on sequential ports starting from 5672.
+and on sequential ports starting from 5672.
status
print the status of all running RabbitMQ nodes
diff --git a/docs/rabbitmq-server.1.pod b/docs/rabbitmq-server.1.pod
index 99a7cecc..04062b1a 100644
--- a/docs/rabbitmq-server.1.pod
+++ b/docs/rabbitmq-server.1.pod
@@ -21,34 +21,32 @@ process or use rabbitmqctl(1).
=head1 ENVIRONMENT
B<RABBITMQ_MNESIA_BASE>
- Defaults to /var/lib/rabbitmq/mnesia. Set this to the directory
- where Mnesia database files should be placed.
+ Defaults to /var/lib/rabbitmq/mnesia. Set this to the directory
+where Mnesia database files should be placed.
B<RABBITMQ_LOG_BASE>
Defaults to /var/log/rabbitmq. Log files generated by the server
- will be placed in this directory.
+will be placed in this directory.
B<RABBITMQ_NODENAME>
Defaults to rabbit. This can be useful if you want to run more
- than one node per machine - B<RABBITMQ_NODENAME> should be unique
- per erlang-node-and-machine combination. See clustering on a
- single machine guide at
- http://www.rabbitmq.com/clustering.html#single-machine for
- details.
+than one node per machine - B<RABBITMQ_NODENAME> should be unique per
+erlang-node-and-machine combination. See clustering on a single
+machine guide at
+http://www.rabbitmq.com/clustering.html#single-machine for details.
B<RABBITMQ_NODE_IP_ADDRESS>
Defaults to 0.0.0.0. This can be changed if you only want to bind
- to one network interface.
+to one network interface.
B<RABBITMQ_NODE_PORT>
Defaults to 5672.
B<RABBITMQ_CLUSTER_CONFIG_FILE>
Defaults to /etc/rabbitmq/rabbitmq_cluster.config. If this file is
- present it is used by the server to auto-configure a RabbitMQ
- cluster.
- See the clustering guide at http://www.rabbitmq.com/clustering.html
- for details.
+present it is used by the server to auto-configure a RabbitMQ cluster.
+See the clustering guide at http://www.rabbitmq.com/clustering.html
+for details.
=head1 OPTIONS
diff --git a/docs/rabbitmq.conf.5.pod b/docs/rabbitmq.conf.5.pod
index 9b2536c3..4d522163 100644
--- a/docs/rabbitmq.conf.5.pod
+++ b/docs/rabbitmq.conf.5.pod
@@ -18,12 +18,12 @@ built-in default values. For example, for the B<RABBITMQ_NODENAME> setting,
B<RABBITMQ_NODENAME>
from the environment is checked first. If it is absent or equal to
- the empty string, then
+the empty string, then
B<NODENAME>
from /etc/rabbitmq/rabbitmq.conf is checked. If it is also absent
- or set equal to the empty string then the default value from
- the startup script is used.
+or set equal to the empty string then the default value from the
+startup script is used.
The variable names in /etc/rabbitmq/rabbitmq.conf are always equal to the
environment variable names, with the B<RABBITMQ_> prefix removed:
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod
index 42156896..58fbb100 100644
--- a/docs/rabbitmqctl.1.pod
+++ b/docs/rabbitmqctl.1.pod
@@ -20,16 +20,16 @@ It performs all actions by connecting to one of the broker's nodes.
B<-n> I<node>
default node is C<rabbit@server>, where server is the local host.
- On a host named C<server.example.com>, the node name of the
- RabbitMQ Erlang node will usually be rabbit@server (unless
- RABBITMQ_NODENAME has been set to some non-default value at broker
- startup time). The output of hostname -s is usually the correct
- suffix to use after the "@" sign. See rabbitmq-server(1) for
- details of configuring the RabbitMQ broker.
+On a host named C<server.example.com>, the node name of the RabbitMQ
+Erlang node will usually be rabbit@server (unless RABBITMQ_NODENAME
+has been set to some non-default value at broker startup time). The
+output of hostname -s is usually the correct suffix to use after the
+"@" sign. See rabbitmq-server(1) for details of configuring the
+RabbitMQ broker.
B<-q>
- quiet output mode is selected with the B<-q> flag. Informational
- messages are suppressed when quiet mode is in effect.
+ quiet output mode is selected with the B<-q> flag. Informational
+messages are suppressed when quiet mode is in effect.
=head1 COMMANDS
@@ -40,53 +40,51 @@ stop
stop_app
stop the RabbitMQ application, leaving the Erlang node running.
- This command is typically run prior to performing other management
- actions that require the RabbitMQ application to be stopped,
- e.g. I<reset>.
+This command is typically run prior to performing other management
+actions that require the RabbitMQ application to be stopped,
+e.g. I<reset>.
start_app
start the RabbitMQ application.
This command is typically run prior to performing other management
- actions that require the RabbitMQ application to be stopped,
- e.g. I<reset>.
+actions that require the RabbitMQ application to be stopped,
+e.g. I<reset>.
status
display various information about the RabbitMQ broker, such as
- whether the RabbitMQ application on the current node, its version
- number, what nodes are part of the broker, which of these are
- running.
+whether the RabbitMQ application on the current node, its version
+number, what nodes are part of the broker, which of these are running.
-force
+reset
return a RabbitMQ node to its virgin state.
Removes the node from any cluster it belongs to, removes all data
- from the management database, such as configured users, vhosts and
- deletes all persistent messages.
+from the management database, such as configured users, vhosts and
+deletes all persistent messages.
force_reset
- the same as I<force> command, but resets the node unconditionally,
- regardless of the current management database state and cluster
- configuration.
+ the same as I<reset> command, but resets the node unconditionally,
+regardless of the current management database state and cluster
+configuration.
It should only be used as a last resort if the database or cluster
- configuration has been corrupted.
+configuration has been corrupted.
rotate_logs [suffix]
instruct the RabbitMQ node to rotate the log files. The RabbitMQ
- broker will attempt to append the current contents of the log file
- to the file with the name composed of the original name and the
- suffix. It will create a new file if such a file does not already
- exist. When no I<suffix> is specified, the empty log file is
- simply created at the original location; no rotation takes place.
- When an error occurs while appending the contents of the old log
- file, the operation behaves in the same way as if no I<suffix> was
- specified.
+broker will attempt to append the current contents of the log file to
+the file with the name composed of the original name and the
+suffix. It will create a new file if such a file does not already
+exist. When no I<suffix> is specified, the empty log file is simply
+created at the original location; no rotation takes place. When an
+error occurs while appending the contents of the old log file, the
+operation behaves in the same way as if no I<suffix> was specified.
This command might be helpful when you are e.g. writing your own
- logrotate script and you do not want to restart the RabbitMQ node.
+logrotate script and you do not want to restart the RabbitMQ node.
cluster I<clusternode> ...
instruct the node to become member of a cluster with the specified
- nodes determined by I<clusternode> option(s).
- See http://www.rabbitmq.com/clustering.html for more information
- about clustering.
+nodes determined by I<clusternode> option(s). See
+http://www.rabbitmq.com/clustering.html for more information about
+clustering.
=head2 USER MANAGEMENT
@@ -110,35 +108,35 @@ add_vhost I<vhostpath>
delete_vhost I<vhostpath>
delete a virtual host I<vhostpath>.
That command deletes also all its exchanges, queues and user
- mappings.
+mappings.
list_vhosts
list all virtual hosts.
set_permissions [-p I<vhostpath>] I<username> I<regexp> I<regexp> I<regexp>
set the permissions for the user named I<username> in the virtual
- host I<vhostpath>, granting 'configure', 'write' and 'read' access
- to resources with names matching the first, second and third
- I<regexp>, respectively.
+host I<vhostpath>, granting 'configure', 'write' and 'read' access to
+resources with names matching the first, second and third I<regexp>,
+respectively.
clear_permissions [-p I<vhostpath>] I<username>
remove the permissions for the user named I<username> in the
- virtual host I<vhostpath>.
+virtual host I<vhostpath>.
list_permissions [-p I<vhostpath>]
list all the users and their permissions in the virtual host
- I<vhostpath>.
+I<vhostpath>.
list_user_permissions I<username>
list the permissions of the user named I<username> across all
- virtual hosts.
+virtual hosts.
=head2 SERVER STATUS
list_queues [-p I<vhostpath>] [I<queueinfoitem> ...]
list queue information by virtual host. If no I<queueinfoitem>s
- are specified then then name and number of messages is displayed
- for each queue.
+are specified then then name and number of messages is displayed for
+each queue.
=head3 Queue information items
@@ -163,8 +161,7 @@ messages_ready
number of messages ready to be delivered to clients
messages_unacknowledged
- number of messages delivered to clients but not yet
- acknowledged
+ number of messages delivered to clients but not yet acknowledged
messages_uncommitted
number of messages published in as yet uncommitted transactions
@@ -174,7 +171,7 @@ messages
acks_uncommitted
number of acknowledgements received in as yet uncommitted
- transactions
+transactions
consumers
number of consumers
@@ -184,14 +181,14 @@ transactions
memory
bytes of memory consumed by the Erlang process for the queue,
- including stack, heap and internal structures
+including stack, heap and internal structures
=back
list_exchanges [-p I<vhostpath>] [I<exchangeinfoitem> ...]
list exchange information by virtual host. If no
- I<exchangeinfoitem>s are specified then name and type is displayed
- for each exchange.
+I<exchangeinfoitem>s are specified then name and type is displayed for
+each exchange.
=head3 Exchange information items
@@ -216,11 +213,11 @@ arguments
list_bindings [-p I<vhostpath>]
list bindings by virtual host. Each line contains exchange name,
- routing key and queue name (all URL encoded) and arguments.
+routing key and queue name (all URL encoded) and arguments.
list_connections [I<connectioninfoitem> ...]
list connection information. If no I<connectioninfoitem>s are
- specified then the user, peer address and peer port are displayed.
+specified then the user, peer address and peer port are displayed.
=head3 Connection information items
@@ -243,7 +240,7 @@ peer_port
state
connection state (B<pre-init>, B<starting>, B<tuning>, B<opening>,
- B<running>, B<closing>, B<closed>)
+B<running>, B<closing>, B<closed>)
channels
number of channels using the connection
diff --git a/ebin/rabbit.rel b/ebin/rabbit.rel
deleted file mode 100644
index c2d2067b..00000000
--- a/ebin/rabbit.rel
+++ /dev/null
@@ -1,7 +0,0 @@
-{release, {"rabbit", "1.1.0-alpha"}, {erts, "1.14.2"},
- [{rabbit, "1.1.0-alpha"},
- {mnesia, "4.3.4"},
- {os_mon, "2.1.2"},
- {sasl, "2.1.5"},
- {stdlib, "1.14.4"},
- {kernel, "2.11.4"}]}.
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 965da130..6fc6e464 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -1,7 +1,7 @@
{application, rabbit, %% -*- erlang -*-
[{description, "RabbitMQ"},
{id, "RabbitMQ"},
- {vsn, "%%VERSION%%"},
+ {vsn, "%%VSN%%"},
{modules, []},
{registered, [rabbit_amqqueue_sup,
rabbit_log,
diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile
index c74d4533..fa2844fd 100644
--- a/packaging/RPMS/Fedora/Makefile
+++ b/packaging/RPMS/Fedora/Makefile
@@ -1,7 +1,8 @@
-VERSION=0.0.0
-SOURCE_TARBALL_DIR=../../../dist
+TARBALL_DIR=../../../dist
+TARBALL=$(notdir $(wildcard $(TARBALL_DIR)/rabbitmq-server-[0-9.]*.tar.gz))
COMMON_DIR=../../common
-TARBALL=$(SOURCE_TARBALL_DIR)/rabbitmq-server-$(VERSION).tar.gz
+VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g')
+
TOP_DIR=$(shell pwd)
#Under debian we do not want to check build dependencies, since that
#only checks build-dependencies using rpms, not debs
@@ -23,13 +24,16 @@ rpms: clean server
prepare:
mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp
- cp $(TOP_DIR)/$(TARBALL) SOURCES
+ cp $(TARBALL_DIR)/$(TARBALL) SOURCES
cp rabbitmq-server.spec SPECS
sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|' \
SPECS/rabbitmq-server.spec
- cp init.d SOURCES/rabbitmq-server.init
cp ${COMMON_DIR}/* SOURCES/
+ sed -i \
+ -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/sysconfig/rabbitmq|' \
+ -e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \
+ SOURCES/rabbitmq-server.init
cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate
server: prepare
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index 9e7c4bfb..7f442831 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -9,6 +9,7 @@ Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{v
Source1: rabbitmq-server.init
Source2: rabbitmq-script-wrapper
Source3: rabbitmq-server.logrotate
+Source4: rabbitmq-asroot-script-wrapper
URL: http://www.rabbitmq.com/
BuildRequires: erlang, python-simplejson
Requires: erlang, logrotate
@@ -22,9 +23,10 @@ RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
scalable implementation of an AMQP broker.
-%define _rabbit_erllibdir %{_libdir}/erlang/lib/rabbitmq_server-%{version}
+%define _rabbit_erllibdir %{_libdir}/rabbitmq/lib/rabbitmq_server-%{version}
%define _rabbit_libdir %{_libdir}/rabbitmq
%define _rabbit_wrapper %{_builddir}/`basename %{S:2}`
+%define _rabbit_asroot_wrapper %{_builddir}/`basename %{S:4}`
%define _maindir %{buildroot}%{_rabbit_erllibdir}
@@ -34,11 +36,9 @@ scalable implementation of an AMQP broker.
%build
cp %{S:2} %{_rabbit_wrapper}
sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_wrapper}
-
-# The rabbitmq build needs escript, which is missing from /usr/bin in
-# some versions of the erlang RPM. See
-# <https://bugzilla.redhat.com/show_bug.cgi?id=481302>
-PATH=%{_libdir}/erlang/bin:$PATH make %{?_smp_mflags}
+cp %{S:4} %{_rabbit_asroot_wrapper}
+sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_asroot_wrapper}
+make %{?_smp_mflags}
%install
rm -rf %{buildroot}
@@ -55,6 +55,7 @@ install -p -D -m 0755 %{S:1} %{buildroot}%{_initrddir}/rabbitmq-server
install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmqctl
install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server
install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi
+install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-activate-plugins
install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server
diff --git a/packaging/common/rabbitmq-asroot-script-wrapper b/packaging/common/rabbitmq-asroot-script-wrapper
new file mode 100644
index 00000000..0dd1c0fb
--- /dev/null
+++ b/packaging/common/rabbitmq-asroot-script-wrapper
@@ -0,0 +1,53 @@
+#!/bin/bash
+## The contents of this file are subject to the Mozilla Public License
+## Version 1.1 (the "License"); you may not use this file except in
+## compliance with the License. You may obtain a copy of the License at
+## http://www.mozilla.org/MPL/
+##
+## Software distributed under the License is distributed on an "AS IS"
+## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+## License for the specific language governing rights and limitations
+## under the License.
+##
+## The Original Code is RabbitMQ.
+##
+## The Initial Developers of the Original Code are LShift Ltd,
+## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+## Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+## Ltd. Portions created by Cohesive Financial Technologies LLC are
+## Copyright (C) 2007-2009 Cohesive Financial Technologies
+## LLC. Portions created by Rabbit Technologies Ltd are Copyright
+## (C) 2007-2009 Rabbit Technologies Ltd.
+##
+## All Rights Reserved.
+##
+## Contributor(s): ______________________________________.
+##
+
+# Escape spaces and quotes, because shell is revolting.
+for arg in "$@" ; do
+ # Escape quotes in parameters, so that they're passed through cleanly.
+ arg=$(sed -e 's/"/\\"/' <<-END
+ $arg
+ END
+ )
+ CMDLINE="${CMDLINE} \"${arg}\""
+done
+
+cd /var/lib/rabbitmq
+
+SCRIPT=`basename $0`
+
+if [ `id -u` = 0 ] ; then
+ /usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}
+else
+ echo -e "\nOnly root should run ${SCRIPT}\n"
+ exit 1
+fi
+
diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper
index 296a77d1..f1a9b1ff 100644
--- a/packaging/common/rabbitmq-script-wrapper
+++ b/packaging/common/rabbitmq-script-wrapper
@@ -1,4 +1,35 @@
#!/bin/bash
+## The contents of this file are subject to the Mozilla Public License
+## Version 1.1 (the "License"); you may not use this file except in
+## compliance with the License. You may obtain a copy of the License at
+## http://www.mozilla.org/MPL/
+##
+## Software distributed under the License is distributed on an "AS IS"
+## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+## License for the specific language governing rights and limitations
+## under the License.
+##
+## The Original Code is RabbitMQ.
+##
+## The Initial Developers of the Original Code are LShift Ltd,
+## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+## Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+## Ltd. Portions created by Cohesive Financial Technologies LLC are
+## Copyright (C) 2007-2009 Cohesive Financial Technologies
+## LLC. Portions created by Rabbit Technologies Ltd are Copyright
+## (C) 2007-2009 Rabbit Technologies Ltd.
+##
+## All Rights Reserved.
+##
+## Contributor(s): ______________________________________.
+##
+
# Escape spaces and quotes, because shell is revolting.
for arg in "$@" ; do
# Escape quotes in parameters, so that they're passed through cleanly.
diff --git a/packaging/RPMS/Fedora/init.d b/packaging/common/rabbitmq-server.init
index 77a6a89a..e71562f8 100644
--- a/packaging/RPMS/Fedora/init.d
+++ b/packaging/common/rabbitmq-server.init
@@ -8,10 +8,10 @@
### BEGIN INIT INFO
# Provides: rabbitmq-server
-# Default-Start:
-# Default-Stop:
# Required-Start: $remote_fs $network
# Required-Stop: $remote_fs $network
+# Default-Start:
+# Default-Stop:
# Description: RabbitMQ broker
# Short-Description: Enable AMQP service provided by RabbitMQ broker
### END INIT INFO
@@ -24,13 +24,14 @@ USER=rabbitmq
NODE_COUNT=1
ROTATE_SUFFIX=
-LOCK_FILE=/var/lock/subsys/$NAME
+DEFAULTS_FILE= # This is filled in when building packages
+LOCK_FILE= # This is filled in when building packages
test -x $DAEMON || exit 0
# Include rabbitmq defaults if available
-if [ -f /etc/sysconfig/rabbitmq ] ; then
- . /etc/sysconfig/rabbitmq
+if [ -f "$DEFAULTS_FILE" ] ; then
+ . $DEFAULTS_FILE
fi
RETVAL=0
@@ -41,7 +42,8 @@ start_rabbitmq () {
$DAEMON start_all ${NODE_COUNT} > /var/log/rabbitmq/startup_log 2> /var/log/rabbitmq/startup_err
case "$?" in
0)
- echo SUCCESS && touch $LOCK_FILE
+ echo SUCCESS
+ [ -n "$LOCK_FILE" ] && touch $LOCK_FILE
RETVAL=0
;;
1)
@@ -52,7 +54,7 @@ start_rabbitmq () {
echo FAILED - check /var/log/rabbitmq/startup_log, _err
RETVAL=1
;;
- esac
+ esac
set -e
}
@@ -62,10 +64,12 @@ stop_rabbitmq () {
if [ $RETVAL = 0 ] ; then
$DAEMON stop_all > /var/log/rabbitmq/shutdown_log 2> /var/log/rabbitmq/shutdown_err
RETVAL=$?
- if [ $RETVAL != 0 ] ; then
- echo FAILED - check /var/log/rabbitmq/shutdown_log, _err
+ if [ $RETVAL = 0 ] ; then
+ # Try to stop epmd if run by the rabbitmq user
+ pkill -u rabbitmq epmd || :
+ [ -n "$LOCK_FILE" ] && rm -rf $LOCK_FILE
else
- rm -rf $LOCK_FILE
+ echo FAILED - check /var/log/rabbitmq/shutdown_log, _err
fi
else
echo No nodes running
@@ -119,19 +123,14 @@ case "$1" in
echo -n "Rotating log files for $DESC: "
rotate_logs_rabbitmq
;;
- force-reload|reload|restart)
- echo -n "Restarting $DESC: "
- restart_rabbitmq
- echo "$NAME."
- ;;
- condrestart|try-restart)
+ force-reload|reload|restart|condrestart|try-restart)
echo -n "Restarting $DESC: "
restart_rabbitmq
echo "$NAME."
;;
*)
echo "Usage: $0 {start|stop|status|rotate-logs|restart|condrestart|try-restart|reload|force-reload}" >&2
- RETVAL=2
+ RETVAL=1
;;
esac
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index 67fabae0..dafaf9ce 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -1,8 +1,9 @@
TARBALL_DIR=../../../dist
-TARBALL=$(shell (cd $(TARBALL_DIR); echo rabbitmq-server-[0-9]*.tar.gz))
+TARBALL=$(notdir $(wildcard $(TARBALL_DIR)/rabbitmq-server-[0-9.]*.tar.gz))
COMMON_DIR=../../common
-DEBIAN_ORIG_TARBALL=$(shell echo $(TARBALL) | sed -e 's:\(.*\)-\(.*\)\(\.tar\.gz\):\1_\2\.orig\3:g')
VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g')
+
+DEBIAN_ORIG_TARBALL=$(shell echo $(TARBALL) | sed -e 's:\(.*\)-\(.*\)\(\.tar\.gz\):\1_\2\.orig\3:g')
UNPACKED_DIR=rabbitmq-server-$(VERSION)
PACKAGENAME=rabbitmq-server
SIGNING_KEY_ID=056E8E56
@@ -21,6 +22,10 @@ package: clean
tar -zxvf $(DEBIAN_ORIG_TARBALL)
cp -r debian $(UNPACKED_DIR)
cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/
+ sed -i \
+ -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/default/rabbitmq|' \
+ -e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \
+ $(UNPACKED_DIR)/debian/rabbitmq-server.init
chmod a+x $(UNPACKED_DIR)/debian/rules
UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR)
cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING)
diff --git a/packaging/debs/Debian/debian/init.d b/packaging/debs/Debian/debian/init.d
deleted file mode 100644
index a35a60ec..00000000
--- a/packaging/debs/Debian/debian/init.d
+++ /dev/null
@@ -1,122 +0,0 @@
-#!/bin/sh
-### BEGIN INIT INFO
-# Provides: rabbitmq
-# Required-Start: $remote_fs $network
-# Required-Stop: $remote_fs $network
-# Default-Start: 2 3 4 5
-# Default-Stop: 0 1 6
-# Description: RabbitMQ broker
-# Short-Description: Enable AMQP service provided by RabbitMQ broker
-### END INIT INFO
-
-PATH=/sbin:/usr/sbin:/bin:/usr/bin
-DAEMON=/usr/sbin/rabbitmq-multi
-NAME=rabbitmq-server
-DESC=rabbitmq-server
-USER=rabbitmq
-NODE_COUNT=1
-ROTATE_SUFFIX=
-
-test -x $DAEMON || exit 0
-
-# Include rabbitmq defaults if available
-if [ -f /etc/default/rabbitmq ] ; then
- . /etc/default/rabbitmq
-fi
-
-RETVAL=0
-set -e
-
-start_rabbitmq () {
- set +e
- $DAEMON start_all ${NODE_COUNT} > /var/log/rabbitmq/startup_log 2> /var/log/rabbitmq/startup_err
- case "$?" in
- 0)
- echo SUCCESS
- RETVAL=0
- ;;
- 1)
- echo TIMEOUT - check /var/log/rabbitmq/startup_\{log,err\}
- RETVAL=1
- ;;
- *)
- echo FAILED - check /var/log/rabbitmq/startup_log, _err
- RETVAL=1
- ;;
- esac
- set -e
-}
-
-stop_rabbitmq () {
- set +e
- status_rabbitmq quiet
- if [ $RETVAL = 0 ] ; then
- $DAEMON stop_all > /var/log/rabbitmq/shutdown_log 2> /var/log/rabbitmq/shutdown_err
- RETVAL=$?
- if [ $RETVAL != 0 ] ; then
- echo FAILED - check /var/log/rabbitmq/shutdown_log, _err
- fi
- else
- echo No nodes running
- RETVAL=0
- fi
- set -e
-}
-
-status_rabbitmq() {
- set +e
- if [ "$1" != "quiet" ] ; then
- $DAEMON status 2>&1
- else
- $DAEMON status > /dev/null 2>&1
- fi
- if [ $? != 0 ] ; then
- RETVAL=1
- fi
- set -e
-}
-
-rotate_logs_rabbitmq() {
- set +e
- $DAEMON rotate_logs ${ROTATE_SUFFIX}
- if [ $? != 0 ] ; then
- RETVAL=1
- fi
- set -e
-}
-
-restart_rabbitmq() {
- stop_rabbitmq
- start_rabbitmq
-}
-
-case "$1" in
- start)
- echo -n "Starting $DESC: "
- start_rabbitmq
- echo "$NAME."
- ;;
- stop)
- echo -n "Stopping $DESC: "
- stop_rabbitmq
- echo "$NAME."
- ;;
- status)
- status_rabbitmq
- ;;
- rotate-logs)
- echo -n "Rotating log files for $DESC: "
- rotate_logs_rabbitmq
- ;;
- force-reload|restart)
- echo -n "Restarting $DESC: "
- restart_rabbitmq
- echo "$NAME."
- ;;
- *)
- echo "Usage: $0 {start|stop|status|rotate-logs|restart|force-reload}" >&2
- RETVAL=1
- ;;
-esac
-
-exit $RETVAL
diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules
index 31904851..365eea6e 100644
--- a/packaging/debs/Debian/debian/rules
+++ b/packaging/debs/Debian/debian/rules
@@ -3,7 +3,7 @@
include /usr/share/cdbs/1/rules/debhelper.mk
include /usr/share/cdbs/1/class/makefile.mk
-RABBIT_LIB=$(DEB_DESTDIR)usr/lib/erlang/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)/
+RABBIT_LIB=$(DEB_DESTDIR)usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)/
RABBIT_BIN=$(DEB_DESTDIR)usr/lib/rabbitmq/bin/
DEB_MAKE_INSTALL_TARGET := install TARGET_DIR=$(RABBIT_LIB) SBIN_DIR=$(RABBIT_BIN) MAN_DIR=$(DEB_DESTDIR)usr/share/man/
@@ -17,3 +17,6 @@ install/rabbitmq-server::
for script in rabbitmqctl rabbitmq-server rabbitmq-multi; do \
install -p -D -m 0755 debian/rabbitmq-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \
done
+ for script in rabbitmq-activate-plugins; do \
+ install -p -D -m 0755 debian/rabbitmq-asroot-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \
+ done
diff --git a/packaging/generic-unix/Makefile b/packaging/generic-unix/Makefile
index b3988696..4eade6c7 100644
--- a/packaging/generic-unix/Makefile
+++ b/packaging/generic-unix/Makefile
@@ -4,10 +4,10 @@ TARGET_DIR=rabbitmq_server-$(VERSION)
TARGET_TARBALL=rabbitmq-server-generic-unix-$(VERSION)
dist:
- make -C ../.. VERSION=$(VERSION) srcdist
+ $(MAKE) -C ../.. VERSION=$(VERSION) srcdist
tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz
- make -C $(SOURCE_DIR) \
+ $(MAKE) -C $(SOURCE_DIR) \
TARGET_DIR=`pwd`/$(TARGET_DIR) \
SBIN_DIR=`pwd`/$(TARGET_DIR)/sbin \
MAN_DIR=`pwd`/$(TARGET_DIR)/share/man \
diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/net/rabbitmq-server/Portfile
index b8096d20..1826d5c4 100644
--- a/packaging/macports/net/rabbitmq-server/Portfile
+++ b/packaging/macports/net/rabbitmq-server/Portfile
@@ -42,7 +42,7 @@ use_parallel_build yes
build.args PYTHON=${prefix}/bin/python2.5
destroot.destdir \
- TARGET_DIR=${destroot}${prefix}/lib/erlang/lib/rabbitmq_server-${version} \
+ TARGET_DIR=${destroot}${prefix}/lib/rabbitmq/lib/rabbitmq_server-${version} \
SBIN_DIR=${sbindir} \
MAN_DIR=${destroot}${prefix}/share/man
@@ -61,9 +61,7 @@ post-destroot {
xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir}
reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \
- ${sbindir}/rabbitmq-multi \
- ${sbindir}/rabbitmq-server \
- ${sbindir}/rabbitmqctl
+ ${sbindir}/rabbitmq-env
reinplace -E "s:(CLUSTER_CONFIG_FILE)=/:\\1=${prefix}/:" \
${sbindir}/rabbitmq-multi \
${sbindir}/rabbitmq-server \
@@ -83,14 +81,19 @@ post-destroot {
xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \
${wrappersbin}/rabbitmq-multi
+ xinstall -m 555 ${filespath}/rabbitmq-asroot-script-wrapper \
+ ${wrappersbin}/rabbitmq-activate-plugins
reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \
${wrappersbin}/rabbitmq-multi
reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \
${wrappersbin}/rabbitmq-multi
+ reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \
+ ${wrappersbin}/rabbitmq-activate-plugins
+ reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \
+ ${wrappersbin}/rabbitmq-activate-plugins
file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server
file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl
-
}
pre-install {
diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper
new file mode 100644
index 00000000..c4488dcb
--- /dev/null
+++ b/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper
@@ -0,0 +1,12 @@
+#!/bin/bash
+cd /var/lib/rabbitmq
+
+SCRIPT=`basename $0`
+
+if [ `id -u` = 0 ] ; then
+ /usr/lib/rabbitmq/bin/${SCRIPT} "$@"
+else
+ echo -e "\nOnly root should run ${SCRIPT}\n"
+ exit 1
+fi
+
diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile
index 59101cb2..387becb3 100644
--- a/packaging/windows/Makefile
+++ b/packaging/windows/Makefile
@@ -4,15 +4,16 @@ TARGET_DIR=rabbitmq_server-$(VERSION)
TARGET_ZIP=rabbitmq-server-windows-$(VERSION)
dist:
- make -C ../.. VERSION=$(VERSION) srcdist
+ $(MAKE) -C ../.. VERSION=$(VERSION) srcdist
tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz
- make -C $(SOURCE_DIR)
+ $(MAKE) -C $(SOURCE_DIR)
mkdir $(SOURCE_DIR)/sbin
mv $(SOURCE_DIR)/scripts/rabbitmq-server.bat $(SOURCE_DIR)/sbin
mv $(SOURCE_DIR)/scripts/rabbitmq-service.bat $(SOURCE_DIR)/sbin
mv $(SOURCE_DIR)/scripts/rabbitmqctl.bat $(SOURCE_DIR)/sbin
mv $(SOURCE_DIR)/scripts/rabbitmq-multi.bat $(SOURCE_DIR)/sbin
+ mv $(SOURCE_DIR)/scripts/rabbitmq-activate-plugins.bat $(SOURCE_DIR)/sbin
rm -rf $(SOURCE_DIR)/scripts
rm -rf $(SOURCE_DIR)/codegen* $(SOURCE_DIR)/Makefile
rm -f $(SOURCE_DIR)/README
diff --git a/scripts/rabbitmq-activate-plugins b/scripts/rabbitmq-activate-plugins
new file mode 100755
index 00000000..5ce64c68
--- /dev/null
+++ b/scripts/rabbitmq-activate-plugins
@@ -0,0 +1,47 @@
+#!/bin/sh
+## The contents of this file are subject to the Mozilla Public License
+## Version 1.1 (the "License"); you may not use this file except in
+## compliance with the License. You may obtain a copy of the License at
+## http://www.mozilla.org/MPL/
+##
+## Software distributed under the License is distributed on an "AS IS"
+## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+## License for the specific language governing rights and limitations
+## under the License.
+##
+## The Original Code is RabbitMQ.
+##
+## The Initial Developers of the Original Code are LShift Ltd,
+## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+## Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+## Ltd. Portions created by Cohesive Financial Technologies LLC are
+## Copyright (C) 2007-2009 Cohesive Financial Technologies
+## LLC. Portions created by Rabbit Technologies Ltd are Copyright
+## (C) 2007-2009 Rabbit Technologies Ltd.
+##
+## All Rights Reserved.
+##
+## Contributor(s): ______________________________________.
+##
+
+. `dirname $0`/rabbitmq-env
+
+RABBITMQ_EBIN=${RABBITMQ_HOME}/ebin
+[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins"
+[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR="${RABBITMQ_HOME}/priv/plugins"
+
+exec erl \
+ -pa "$RABBITMQ_EBIN" \
+ -rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \
+ -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \
+ -rabbit rabbit_ebin "\"$RABBITMQ_EBIN\"" \
+ -noinput \
+ -hidden \
+ -s rabbit_plugin_activator \
+ -extra "$@"
diff --git a/scripts/rabbitmq-activate-plugins.bat b/scripts/rabbitmq-activate-plugins.bat
new file mode 100644
index 00000000..3540bf2d
--- /dev/null
+++ b/scripts/rabbitmq-activate-plugins.bat
@@ -0,0 +1,49 @@
+@echo off
+REM The contents of this file are subject to the Mozilla Public License
+REM Version 1.1 (the "License"); you may not use this file except in
+REM compliance with the License. You may obtain a copy of the License at
+REM http://www.mozilla.org/MPL/
+REM
+REM Software distributed under the License is distributed on an "AS IS"
+REM basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+REM License for the specific language governing rights and limitations
+REM under the License.
+REM
+REM The Original Code is RabbitMQ.
+REM
+REM The Initial Developers of the Original Code are LShift Ltd,
+REM Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+REM
+REM Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+REM Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+REM are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+REM Technologies LLC, and Rabbit Technologies Ltd.
+REM
+REM Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+REM Ltd. Portions created by Cohesive Financial Technologies LLC are
+REM Copyright (C) 2007-2009 Cohesive Financial Technologies
+REM LLC. Portions created by Rabbit Technologies Ltd are Copyright
+REM (C) 2007-2009 Rabbit Technologies Ltd.
+REM
+REM All Rights Reserved.
+REM
+REM Contributor(s): ______________________________________.
+REM
+
+if not exist "%ERLANG_HOME%\bin\erl.exe" (
+ echo.
+ echo ******************************
+ echo ERLANG_HOME not set correctly.
+ echo ******************************
+ echo.
+ echo Please either set ERLANG_HOME to point to your Erlang installation or place the
+ echo RabbitMQ server distribution in the Erlang lib folder.
+ echo.
+ exit /B
+)
+
+set RABBITMQ_PLUGINS_DIR="%~dp0..\plugins"
+set RABBITMQ_PLUGINS_EXPAND_DIR="%~dp0..\priv\plugins"
+set RABBITMQ_EBIN_DIR="%~dp0..\ebin"
+
+"%ERLANG_HOME%\bin\erl.exe" -pa "%~dp0..\ebin" -noinput -hidden -s rabbit_plugin_activator -rabbit plugins_dir \"%RABBITMQ_PLUGINS_DIR:\=/%\" -rabbit plugins_expand_dir \"%RABBITMQ_PLUGINS_EXPAND_DIR:\=/%\" -rabbit rabbit_ebin \"%RABBITMQ_EBIN_DIR:\=/%\" -extra %*
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
new file mode 100755
index 00000000..69ddbcfe
--- /dev/null
+++ b/scripts/rabbitmq-env
@@ -0,0 +1,53 @@
+#!/bin/sh
+## The contents of this file are subject to the Mozilla Public License
+## Version 1.1 (the "License"); you may not use this file except in
+## compliance with the License. You may obtain a copy of the License at
+## http://www.mozilla.org/MPL/
+##
+## Software distributed under the License is distributed on an "AS IS"
+## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+## License for the specific language governing rights and limitations
+## under the License.
+##
+## The Original Code is RabbitMQ.
+##
+## The Initial Developers of the Original Code are LShift Ltd,
+## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+## Technologies LLC, and Rabbit Technologies Ltd.
+##
+## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+## Ltd. Portions created by Cohesive Financial Technologies LLC are
+## Copyright (C) 2007-2009 Cohesive Financial Technologies
+## LLC. Portions created by Rabbit Technologies Ltd are Copyright
+## (C) 2007-2009 Rabbit Technologies Ltd.
+##
+## All Rights Reserved.
+##
+## Contributor(s): ______________________________________.
+##
+
+# Determine where this script is really located
+SCRIPT_PATH="$0"
+while [ -h "$SCRIPT_PATH" ] ; do
+ FULL_PATH=`readlink -f $SCRIPT_PATH 2>/dev/null`
+ if [ "$?" != "0" ]; then
+ REL_PATH=`readlink $SCRIPT_PATH`
+ if expr "$REL_PATH" : '/.*' > /dev/null; then
+ SCRIPT_PATH="$REL_PATH"
+ else
+ SCRIPT_PATH="`dirname "$SCRIPT_PATH"`/$REL_PATH"
+ fi
+ else
+ SCRIPT_PATH=$FULL_PATH
+ fi
+done
+
+SCRIPT_DIR=`dirname $SCRIPT_PATH`
+RABBITMQ_HOME="${SCRIPT_DIR}/.."
+
+# Load configuration from the rabbitmq.conf file
+[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi
index 1d0c785f..7db4cb70 100755
--- a/scripts/rabbitmq-multi
+++ b/scripts/rabbitmq-multi
@@ -37,7 +37,7 @@ PIDS_FILE=/var/lib/rabbitmq/pids
MULTI_ERL_ARGS=
MULTI_START_ARGS=
-[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
+. `dirname $0`/rabbitmq-env
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
@@ -60,7 +60,7 @@ export \
set -f
exec erl \
- -pa "`dirname $0`/../ebin" \
+ -pa "${RABBITMQ_HOME}/ebin" \
-noinput \
-hidden \
${RABBITMQ_MULTI_ERL_ARGS} \
diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat
index a30c0889..8abf13f1 100755
--- a/scripts/rabbitmq-multi.bat
+++ b/scripts/rabbitmq-multi.bat
@@ -49,10 +49,6 @@ if "%RABBITMQ_NODE_PORT%"=="" (
set RABBITMQ_PIDS_FILE=%RABBITMQ_BASE%\rabbitmq.pids
set RABBITMQ_SCRIPT_HOME=%~sdp0%
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 8502d60a..547220b4 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -41,7 +41,7 @@ LOG_BASE=/var/log/rabbitmq
MNESIA_BASE=/var/lib/rabbitmq/mnesia
SERVER_START_ARGS=
-[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
+. `dirname $0`/rabbitmq-env
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
@@ -75,16 +75,25 @@ fi
RABBITMQ_START_RABBIT=
[ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT='-noinput -s rabbit'
+RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin"
+if [ -f "${RABBITMQ_EBIN_ROOT}/rabbit.boot" ]; then
+ RABBITMQ_BOOT_FILE="${RABBITMQ_EBIN_ROOT}/rabbit"
+ RABBITMQ_EBIN_PATH=""
+else
+ RABBITMQ_BOOT_FILE=start_sasl
+ RABBITMQ_EBIN_PATH="-pa ${RABBITMQ_EBIN_ROOT}"
+fi
+
# we need to turn off path expansion because some of the vars, notably
# RABBITMQ_SERVER_ERL_ARGS, contain terms that look like globs and
# there is no other way of preventing their expansion.
set -f
exec erl \
- -pa "`dirname $0`/../ebin" \
+ ${RABBITMQ_EBIN_PATH} \
${RABBITMQ_START_RABBIT} \
-sname ${RABBITMQ_NODENAME} \
- -boot start_sasl \
+ -boot ${RABBITMQ_BOOT_FILE} \
+W w \
${RABBITMQ_SERVER_ERL_ARGS} \
-rabbit tcp_listeners '[{"'${RABBITMQ_NODE_IP_ADDRESS}'", '${RABBITMQ_NODE_PORT}'}]' \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 9915727b..a784fee3 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -46,10 +46,6 @@ if "%RABBITMQ_NODE_PORT%"=="" (
set RABBITMQ_NODE_PORT=5672
)
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
@@ -84,10 +80,10 @@ set LOGS_BACKUP="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%.log%BACKUP_EXTENSION%"
set SASL_LOGS_BAKCUP="%RABBITMQ_BASE%\log\%RABBITMQ_NODENAME%-sasl.log%BACKUP_EXTENSION%"
if exist %LOGS% (
- type %LOGS% >> %LOGS_BACKUP%
+ type %LOGS% >> %LOGS_BACKUP%
)
if exist %SASL_LOGS% (
- type %SASL_LOGS% >> %SASL_LOGS_BAKCUP%
+ type %SASL_LOGS% >> %SASL_LOGS_BAKCUP%
)
rem End of log management
@@ -104,11 +100,20 @@ set CLUSTER_CONFIG=-rabbit cluster_config \""%RABBITMQ_CLUSTER_CONFIG_FILE:\=/%"
if "%RABBITMQ_MNESIA_DIR%"=="" (
set RABBITMQ_MNESIA_DIR=%RABBITMQ_MNESIA_BASE%/%RABBITMQ_NODENAME%-mnesia
)
+set RABBITMQ_EBIN_ROOT=%~dp0..\ebin
+if exist "%RABBITMQ_EBIN_ROOT%\rabbit.boot" (
+ echo Using Custom Boot File "%RABBITMQ_EBIN_ROOT%\rabbit.boot"
+ set RABBITMQ_BOOT_FILE="%RABBITMQ_EBIN_ROOT%\rabbit"
+ set RABBITMQ_EBIN_PATH=
+) else (
+ set RABBITMQ_BOOT_FILE=start_sasl
+ set RABBITMQ_EBIN_PATH=-pa "%RABBITMQ_EBIN_ROOT%"
+)
"%ERLANG_HOME%\bin\erl.exe" ^
--pa "%~dp0..\ebin" ^
+%RABBITMQ_EBIN_PATH% ^
-noinput ^
--boot start_sasl ^
+-boot %RABBITMQ_BOOT_FILE% ^
-sname %RABBITMQ_NODENAME% ^
-s rabbit ^
+W w ^
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index c57978c0..9c45e73d 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -30,12 +30,12 @@
## Contributor(s): ______________________________________.
##
-[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
+. `dirname $0`/rabbitmq-env
[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS}
exec erl \
- -pa "`dirname $0`/../ebin" \
+ -pa "${RABBITMQ_HOME}/ebin" \
-noinput \
-hidden \
${RABBITMQ_CTL_ERL_ARGS} \
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index e4dccfba..5111724f 100755
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -30,10 +30,6 @@ REM
REM Contributor(s): ______________________________________.
REM
-if "%ERLANG_HOME%"=="" (
- set ERLANG_HOME=%~dp0%..\..\..
-)
-
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index ba8becfc..36fb4fa8 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -1,4 +1,4 @@
-%% This file is a copy of gen_server.erl from the R11B-5 Erlang/OTP
+%% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP
%% distribution, with the following modifications:
%%
%% 1) the module name is gen_server2
@@ -21,6 +21,42 @@
%% higher priorities are processed before requests with lower
%% priorities. The default priority is 0.
%%
+%% 5) The callback module can optionally implement
+%% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be
+%% called immediately prior to and post hibernation, respectively. If
+%% handle_pre_hibernate returns {hibernate, NewState} then the process
+%% will hibernate. If the module does not implement
+%% handle_pre_hibernate/1 then the default action is to hibernate.
+%%
+%% 6) init can return a 4th arg, {backoff, InitialTimeout,
+%% MinimumTimeout, DesiredHibernatePeriod} (all in
+%% milliseconds). Then, on all callbacks which can return a timeout
+%% (including init), timeout can be 'hibernate'. When this is the
+%% case, the current timeout value will be used (initially, the
+%% InitialTimeout supplied from init). After this timeout has
+%% occurred, hibernation will occur as normal. Upon awaking, a new
+%% current timeout value will be calculated.
+%%
+%% The purpose is that the gen_server2 takes care of adjusting the
+%% current timeout value such that the process will increase the
+%% timeout value repeatedly if it is unable to sleep for the
+%% DesiredHibernatePeriod. If it is able to sleep for the
+%% DesiredHibernatePeriod it will decrease the current timeout down to
+%% the MinimumTimeout, so that the process is put to sleep sooner (and
+%% hopefully stays asleep for longer). In short, should a process
+%% using this receive a burst of messages, it should not hibernate
+%% between those messages, but as the messages become less frequent,
+%% the process will not only hibernate, it will do so sooner after
+%% each message.
+%%
+%% When using this backoff mechanism, normal timeout values (i.e. not
+%% 'hibernate') can still be used, and if they are used then the
+%% handle_info(timeout, State) will be called as normal. In this case,
+%% returning 'hibernate' from handle_info(timeout, State) will not
+%% hibernate the process immediately, as it would if backoff wasn't
+%% being used. Instead it'll wait for the current timeout as described
+%% above.
+
%% All modifications are (C) 2009 LShift Ltd.
%% ``The contents of this file are subject to the Erlang Public License,
@@ -55,6 +91,7 @@
%%% init(Args)
%%% ==> {ok, State}
%%% {ok, State, Timeout}
+%%% {ok, State, Timeout, Backoff}
%%% ignore
%%% {stop, Reason}
%%%
@@ -86,6 +123,17 @@
%%%
%%% ==> ok
%%%
+%%% handle_pre_hibernate(State)
+%%%
+%%% ==> {hibernate, State}
+%%% {stop, Reason, State}
+%%% Reason = normal | shutdown | Term, terminate(State) is called
+%%%
+%%% handle_post_hibernate(State)
+%%%
+%%% ==> {noreply, State}
+%%% {stop, Reason, State}
+%%% Reason = normal | shutdown | Term, terminate(State) is called
%%%
%%% The work flow (of the server) can be described as follows:
%%%
@@ -116,7 +164,7 @@
cast/2, pcast/3, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
- enter_loop/3, enter_loop/4, enter_loop/5]).
+ enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]).
-export([behaviour_info/1]).
@@ -290,7 +338,7 @@ multi_call(Nodes, Name, Req, Timeout)
%%-----------------------------------------------------------------
-%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>) ->_
+%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_
%%
%% Description: Makes an existing process into a gen_server.
%% The calling process will enter the gen_server receive
@@ -301,20 +349,30 @@ multi_call(Nodes, Name, Req, Timeout)
%% process, including registering a name for it.
%%-----------------------------------------------------------------
enter_loop(Mod, Options, State) ->
- enter_loop(Mod, Options, State, self(), infinity).
+ enter_loop(Mod, Options, State, self(), infinity, undefined).
+
+enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) ->
+ enter_loop(Mod, Options, State, self(), infinity, Backoff);
enter_loop(Mod, Options, State, ServerName = {_, _}) ->
- enter_loop(Mod, Options, State, ServerName, infinity);
+ enter_loop(Mod, Options, State, ServerName, infinity, undefined);
enter_loop(Mod, Options, State, Timeout) ->
- enter_loop(Mod, Options, State, self(), Timeout).
+ enter_loop(Mod, Options, State, self(), Timeout, undefined).
+
+enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) ->
+ enter_loop(Mod, Options, State, ServerName, infinity, Backoff);
enter_loop(Mod, Options, State, ServerName, Timeout) ->
+ enter_loop(Mod, Options, State, ServerName, Timeout, undefined).
+
+enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) ->
Name = get_proc_name(ServerName),
Parent = get_parent(),
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
- loop(Parent, Name, State, Mod, Timeout, Queue, Debug).
+ Backoff1 = extend_backoff(Backoff),
+ loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug).
%%%========================================================================
%%% Gen-callback functions
@@ -329,23 +387,37 @@ enter_loop(Mod, Options, State, ServerName, Timeout) ->
%%% ---------------------------------------------------
init_it(Starter, self, Name, Mod, Args, Options) ->
init_it(Starter, self(), Name, Mod, Args, Options);
-init_it(Starter, Parent, Name, Mod, Args, Options) ->
+init_it(Starter, Parent, Name0, Mod, Args, Options) ->
+ Name = name(Name0),
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
case catch Mod:init(Args) of
{ok, State} ->
proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, infinity, Queue, Debug);
+ loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug);
{ok, State, Timeout} ->
- proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, Timeout, Queue, Debug);
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug);
+ {ok, State, Timeout, Backoff = {backoff, _, _, _}} ->
+ Backoff1 = extend_backoff(Backoff),
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug);
{stop, Reason} ->
+ %% For consistency, we must make sure that the
+ %% registered name (if any) is unregistered before
+ %% the parent process is notified about the failure.
+ %% (Otherwise, the parent process could get
+ %% an 'already_started' error if it immediately
+ %% tried starting the process again.)
+ unregister_name(Name0),
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
ignore ->
+ unregister_name(Name0),
proc_lib:init_ack(Starter, ignore),
exit(normal);
{'EXIT', Reason} ->
+ unregister_name(Name0),
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
Else ->
@@ -354,33 +426,159 @@ init_it(Starter, Parent, Name, Mod, Args, Options) ->
exit(Error)
end.
+name({local,Name}) -> Name;
+name({global,Name}) -> Name;
+%% name(Pid) when is_pid(Pid) -> Pid;
+%% when R11 goes away, drop the line beneath and uncomment the line above
+name(Name) -> Name.
+
+unregister_name({local,Name}) ->
+ _ = (catch unregister(Name));
+unregister_name({global,Name}) ->
+ _ = global:unregister_name(Name);
+unregister_name(Pid) when is_pid(Pid) ->
+ Pid.
+
+extend_backoff(undefined) ->
+ undefined;
+extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) ->
+ {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod, now()}.
+
%%%========================================================================
%%% Internal functions
%%%========================================================================
%%% ---------------------------------------------------
%%% The MAIN loop.
%%% ---------------------------------------------------
-loop(Parent, Name, State, Mod, Time, Queue, Debug) ->
+loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) ->
+ pre_hibernate(Parent, Name, State, Mod, undefined, Queue, Debug);
+loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
+ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState,
+ drain(Queue), Debug).
+
+drain(Queue) ->
receive
- Input -> loop(Parent, Name, State, Mod,
- Time, in(Input, Queue), Debug)
- after 0 ->
- case priority_queue:out(Queue) of
- {{value, Msg}, Queue1} ->
- process_msg(Parent, Name, State, Mod,
- Time, Queue1, Debug, Msg);
- {empty, Queue1} ->
- receive
- Input ->
- loop(Parent, Name, State, Mod,
- Time, in(Input, Queue1), Debug)
- after Time ->
- process_msg(Parent, Name, State, Mod,
- Time, Queue1, Debug, timeout)
+ Input -> drain(in(Input, Queue))
+ after 0 -> Queue
+ end.
+
+process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
+ case priority_queue:out(Queue) of
+ {{value, Msg}, Queue1} ->
+ process_msg(Parent, Name, State, Mod,
+ Time, TimeoutState, Queue1, Debug, Msg);
+ {empty, Queue1} ->
+ {Time1, HibOnTimeout}
+ = case {Time, TimeoutState} of
+ {hibernate, {backoff, Current, _Min, _Desired, _RSt}} ->
+ {Current, true};
+ {hibernate, _} ->
+ %% wake_hib/7 will set Time to hibernate. If
+ %% we were woken and didn't receive a msg
+ %% then we will get here and need a sensible
+ %% value for Time1, otherwise we crash.
+ %% R13B1 always waits infinitely when waking
+ %% from hibernation, so that's what we do
+ %% here too.
+ {infinity, false};
+ _ -> {Time, false}
+ end,
+ receive
+ Input ->
+ %% Time could be 'hibernate' here, so *don't* call loop
+ process_next_msg(
+ Parent, Name, State, Mod, Time, TimeoutState,
+ drain(in(Input, Queue1)), Debug)
+ after Time1 ->
+ case HibOnTimeout of
+ true ->
+ pre_hibernate(
+ Parent, Name, State, Mod, TimeoutState, Queue1,
+ Debug);
+ false ->
+ process_msg(
+ Parent, Name, State, Mod, Time, TimeoutState,
+ Queue1, Debug, timeout)
end
end
end.
+wake_hib(Parent, Name, State, Mod, TS, Queue, Debug) ->
+ TimeoutState1 = case TS of
+ undefined ->
+ undefined;
+ {SleptAt, TimeoutState} ->
+ adjust_timeout_state(SleptAt, now(), TimeoutState)
+ end,
+ post_hibernate(Parent, Name, State, Mod, TimeoutState1,
+ drain(Queue), Debug).
+
+hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+ TS = case TimeoutState of
+ undefined -> undefined;
+ {backoff, _, _, _, _} -> {now(), TimeoutState}
+ end,
+ proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod,
+ TS, Queue, Debug]).
+
+pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+ case erlang:function_exported(Mod, handle_pre_hibernate, 1) of
+ true ->
+ case catch Mod:handle_pre_hibernate(State) of
+ {hibernate, NState} ->
+ hibernate(Parent, Name, NState, Mod, TimeoutState, Queue,
+ Debug);
+ Reply ->
+ handle_common_termination(Reply, Name, pre_hibernate,
+ Mod, State, Debug)
+ end;
+ false ->
+ hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug)
+ end.
+
+post_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+ case erlang:function_exported(Mod, handle_post_hibernate, 1) of
+ true ->
+ case catch Mod:handle_post_hibernate(State) of
+ {noreply, NState} ->
+ process_next_msg(Parent, Name, NState, Mod, infinity,
+ TimeoutState, Queue, Debug);
+ {noreply, NState, Time} ->
+ process_next_msg(Parent, Name, NState, Mod, Time,
+ TimeoutState, Queue, Debug);
+ Reply ->
+ handle_common_termination(Reply, Name, post_hibernate,
+ Mod, State, Debug)
+ end;
+ false ->
+ %% use hibernate here, not infinity. This matches
+ %% R13B. The key is that we should be able to get through
+ %% to process_msg calling sys:handle_system_msg with Time
+ %% still set to hibernate, iff that msg is the very msg
+ %% that woke us up (or the first msg we receive after
+ %% waking up).
+ process_next_msg(Parent, Name, State, Mod, hibernate,
+ TimeoutState, Queue, Debug)
+ end.
+
+adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
+ DesiredHibPeriod, RandomState}) ->
+ NapLengthMicros = timer:now_diff(AwokeAt, SleptAt),
+ CurrentMicros = CurrentTO * 1000,
+ MinimumMicros = MinimumTO * 1000,
+ DesiredHibMicros = DesiredHibPeriod * 1000,
+ GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros,
+ Base =
+ %% If enough time has passed between the last two messages then we
+ %% should consider sleeping sooner. Otherwise stay awake longer.
+ case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of
+ true -> lists:max([MinimumTO, CurrentTO div 2]);
+ false -> CurrentTO
+ end,
+ {Extra, RandomState1} = random:uniform_s(Base, RandomState),
+ CurrentTO1 = Base + Extra,
+ {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}.
+
in({'$gen_pcast', {Priority, Msg}}, Queue) ->
priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
@@ -388,19 +586,25 @@ in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
in(Input, Queue) ->
priority_queue:in(Input, Queue).
-process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) ->
+process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
+ Debug, Msg) ->
case Msg of
{system, From, Req} ->
- sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
- [Name, State, Mod, Time, Queue]);
+ sys:handle_system_msg
+ (Req, From, Parent, ?MODULE, Debug,
+ [Name, State, Mod, Time, TimeoutState, Queue]);
+ %% gen_server puts Hib on the end as the 7th arg, but that
+ %% version of the function seems not to be documented so
+ %% leaving out for now.
{'EXIT', Parent, Reason} ->
terminate(Reason, Name, Msg, Mod, State, Debug);
_Msg when Debug =:= [] ->
- handle_msg(Msg, Parent, Name, State, Mod, Time, Queue);
+ handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue);
_Msg ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
Name, {in, Msg}),
- handle_msg(Msg, Parent, Name, State, Mod, Time, Queue, Debug1)
+ handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue,
+ Debug1)
end.
%%% ---------------------------------------------------
@@ -598,87 +802,95 @@ dispatch(Info, Mod, State) ->
Mod:handle_info(Info, State).
handle_msg({'$gen_call', From, Msg},
- Parent, Name, State, Mod, _Time, Queue) ->
+ Parent, Name, State, Mod, TimeoutState, Queue) ->
case catch Mod:handle_call(Msg, From, State) of
{reply, Reply, NState} ->
reply(From, Reply),
- loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
{reply, Reply, NState, Time1} ->
reply(From, Reply),
- loop(Parent, Name, NState, Mod, Time1, Queue, []);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
{noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
{noreply, NState, Time1} ->
- loop(Parent, Name, NState, Mod, Time1, Queue, []);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
{stop, Reason, Reply, NState} ->
{'EXIT', R} =
(catch terminate(Reason, Name, Msg, Mod, NState, [])),
reply(From, Reply),
exit(R);
- Other -> handle_common_reply(Other,
- Parent, Name, Msg, Mod, State, Queue)
+ Other -> handle_common_reply(Other, Parent, Name, Msg, Mod, State,
+ TimeoutState, Queue)
end;
handle_msg(Msg,
- Parent, Name, State, Mod, _Time, Queue) ->
+ Parent, Name, State, Mod, TimeoutState, Queue) ->
Reply = (catch dispatch(Msg, Mod, State)),
- handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue).
+ handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
+ TimeoutState, Queue).
handle_msg({'$gen_call', From, Msg},
- Parent, Name, State, Mod, _Time, Queue, Debug) ->
+ Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
case catch Mod:handle_call(Msg, From, State) of
{reply, Reply, NState} ->
Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
+ Debug1);
{reply, Reply, NState, Time1} ->
Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
{noreply, NState} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
+ Debug1);
{noreply, NState, Time1} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
{stop, Reason, Reply, NState} ->
{'EXIT', R} =
(catch terminate(Reason, Name, Msg, Mod, NState, Debug)),
reply(Name, From, Reply, NState, Debug),
exit(R);
Other ->
- handle_common_reply(Other,
- Parent, Name, Msg, Mod, State, Queue, Debug)
+ handle_common_reply(Other, Parent, Name, Msg, Mod, State,
+ TimeoutState, Queue, Debug)
end;
handle_msg(Msg,
- Parent, Name, State, Mod, _Time, Queue, Debug) ->
+ Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
Reply = (catch dispatch(Msg, Mod, State)),
- handle_common_reply(Reply,
- Parent, Name, Msg, Mod, State, Queue, Debug).
+ handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
+ TimeoutState, Queue, Debug).
-handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) ->
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State,
+ TimeoutState, Queue) ->
case Reply of
{noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
{noreply, NState, Time1} ->
- loop(Parent, Name, NState, Mod, Time1, Queue, []);
- {stop, Reason, NState} ->
- terminate(Reason, Name, Msg, Mod, NState, []);
- {'EXIT', What} ->
- terminate(What, Name, Msg, Mod, State, []);
- _ ->
- terminate({bad_return_value, Reply}, Name, Msg, Mod, State, [])
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
+ _ ->
+ handle_common_termination(Reply, Name, Msg, Mod, State, [])
end.
-handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) ->
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue,
+ Debug) ->
case Reply of
{noreply, NState} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue,
+ Debug1);
{noreply, NState, Time1} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
+ _ ->
+ handle_common_termination(Reply, Name, Msg, Mod, State, Debug)
+ end.
+
+handle_common_termination(Reply, Name, Msg, Mod, State, Debug) ->
+ case Reply of
{stop, Reason, NState} ->
terminate(Reason, Name, Msg, Mod, NState, Debug);
{'EXIT', What} ->
@@ -696,16 +908,24 @@ reply(Name, {To, Tag}, Reply, State, Debug) ->
%%-----------------------------------------------------------------
%% Callback functions for system messages handling.
%%-----------------------------------------------------------------
-system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) ->
- loop(Parent, Name, State, Mod, Time, Queue, Debug).
+system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) ->
+ loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug).
-system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) ->
+-ifdef(use_specs).
+-spec system_terminate(_, _, _, [_]) -> no_return().
+-endif.
+
+system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time,
+ _TimeoutState, _Queue]) ->
terminate(Reason, Name, [], Mod, State, Debug).
-system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) ->
+system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module,
+ OldVsn, Extra) ->
case catch Mod:code_change(OldVsn, State, Extra) of
- {ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]};
- Else -> Else
+ {ok, NewState} ->
+ {ok, [Name, NewState, Mod, Time, TimeoutState, Queue]};
+ Else ->
+ Else
end.
%%-----------------------------------------------------------------
@@ -747,6 +967,8 @@ terminate(Reason, Name, Msg, Mod, State, Debug) ->
exit(normal);
shutdown ->
exit(shutdown);
+ {shutdown,_}=Shutdown ->
+ exit(Shutdown);
_ ->
error_info(Reason, Name, Msg, State, Debug),
exit(Reason)
@@ -871,8 +1093,8 @@ name_to_pid(Name) ->
%% Status information
%%-----------------------------------------------------------------
format_status(Opt, StatusData) ->
- [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, Queue]] =
- StatusData,
+ [PDict, SysState, Parent, Debug,
+ [Name, State, Mod, _Time, _TimeoutState, Queue]] = StatusData,
NameTag = if is_pid(Name) ->
pid_to_list(Name);
is_atom(Name) ->
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 732757c4..c74b39a9 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -55,7 +55,8 @@
-module(priority_queue).
--export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]).
+-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3,
+ out/1, join/2]).
%%----------------------------------------------------------------------------
@@ -73,6 +74,7 @@
-spec(in/2 :: (any(), pqueue()) -> pqueue()).
-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
+-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
-endif.
@@ -147,6 +149,42 @@ out({pqueue, [{P, Q} | Queues]}) ->
end,
{R, NewQ}.
+join(A, {queue, [], []}) ->
+ A;
+join({queue, [], []}, B) ->
+ B;
+join({queue, AIn, AOut}, {queue, BIn, BOut}) ->
+ {queue, BIn, AOut ++ lists:reverse(AIn, BOut)};
+join(A = {queue, _, _}, {pqueue, BPQ}) ->
+ {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ),
+ Post1 = case Post of
+ [] -> [ {0, A} ];
+ [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ];
+ _ -> [ {0, A} | Post ]
+ end,
+ {pqueue, Pre ++ Post1};
+join({pqueue, APQ}, B = {queue, _, _}) ->
+ {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ),
+ Post1 = case Post of
+ [] -> [ {0, B} ];
+ [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ];
+ _ -> [ {0, B} | Post ]
+ end,
+ {pqueue, Pre ++ Post1};
+join({pqueue, APQ}, {pqueue, BPQ}) ->
+ {pqueue, merge(APQ, BPQ, [])}.
+
+merge([], BPQ, Acc) ->
+ lists:reverse(Acc, BPQ);
+merge(APQ, [], Acc) ->
+ lists:reverse(Acc, APQ);
+merge([{P, A}|As], [{P, B}|Bs], Acc) ->
+ merge(As, Bs, [ {P, join(A, B)} | Acc ]);
+merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB ->
+ merge(As, Bs, [ {PA, A} | Acc ]);
+merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
+ merge(As, Bs, [ {PB, B} | Acc ]).
+
r2f([]) -> {queue, [], []};
r2f([_] = R) -> {queue, [], R};
r2f([X,Y]) -> {queue, [X], [Y]};
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 29e23b6d..6ad22e7a 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -133,6 +133,7 @@ start(normal, []) ->
{"core processes",
fun () ->
ok = start_child(rabbit_log),
+ ok = rabbit_hooks:start(),
ok = rabbit_amqqueue:start(),
@@ -222,8 +223,21 @@ log_location(Type) ->
print_banner() ->
{ok, Product} = application:get_key(id),
{ok, Version} = application:get_key(vsn),
- io:format("~s ~s (AMQP ~p-~p)~n~s~n~s~n~n",
- [Product, Version,
+ ProductLen = string:len(Product),
+ io:format("~n"
+ "+---+ +---+~n"
+ "| | | |~n"
+ "| | | |~n"
+ "| | | |~n"
+ "| +---+ +-------+~n"
+ "| |~n"
+ "| ~s +---+ |~n"
+ "| | | |~n"
+ "| ~s +---+ |~n"
+ "| |~n"
+ "+-------------------+~n"
+ "AMQP ~p-~p~n~s~n~s~n~n",
+ [Product, string:right([$v|Version], ProductLen),
?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR,
?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
Settings = [{"node", node()},
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 21999f16..309c9a0e 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -41,7 +41,7 @@
-define(MEMSUP_CHECK_INTERVAL, 1000).
%% OSes on which we know memory alarms to be trustworthy
--define(SUPPORTED_OS, [{unix, linux}]).
+-define(SUPPORTED_OS, [{unix, linux}, {unix, darwin}]).
-record(alarms, {alertees, system_memory_high_watermark = false}).
@@ -136,33 +136,35 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
start_memsup() ->
- Mod = case os:type() of
- %% memsup doesn't take account of buffers or cache when
- %% considering "free" memory - therefore on Linux we can
- %% get memory alarms very easily without any pressure
- %% existing on memory at all. Therefore we need to use
- %% our own simple memory monitor.
- %%
- {unix, linux} -> rabbit_memsup_linux;
-
- %% Start memsup programmatically rather than via the
- %% rabbitmq-server script. This is not quite the right
- %% thing to do as os_mon checks to see if memsup is
- %% available before starting it, but as memsup is
- %% available everywhere (even on VXWorks) it should be
- %% ok.
- %%
- %% One benefit of the programmatic startup is that we
- %% can add our alarm_handler before memsup is running,
- %% thus ensuring that we notice memory alarms that go
- %% off on startup.
- %%
- _ -> memsup
- end,
+ {Mod, Args} =
+ case os:type() of
+ %% memsup doesn't take account of buffers or cache when
+ %% considering "free" memory - therefore on Linux we can
+ %% get memory alarms very easily without any pressure
+ %% existing on memory at all. Therefore we need to use
+ %% our own simple memory monitor.
+ %%
+ {unix, linux} -> {rabbit_memsup, [rabbit_memsup_linux]};
+ {unix, darwin} -> {rabbit_memsup, [rabbit_memsup_darwin]};
+
+ %% Start memsup programmatically rather than via the
+ %% rabbitmq-server script. This is not quite the right
+ %% thing to do as os_mon checks to see if memsup is
+ %% available before starting it, but as memsup is
+ %% available everywhere (even on VXWorks) it should be
+ %% ok.
+ %%
+ %% One benefit of the programmatic startup is that we
+ %% can add our alarm_handler before memsup is running,
+ %% thus ensuring that we notice memory alarms that go
+ %% off on startup.
+ %%
+ _ -> {memsup, []}
+ end,
%% This is based on os_mon:childspec(memsup, true)
{ok, _} = supervisor:start_child(
os_mon_sup,
- {memsup, {Mod, start_link, []},
+ {memsup, {Mod, start_link, Args},
permanent, 2000, worker, [Mod]}),
ok.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 198e2782..f05f7880 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -51,8 +51,6 @@
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
--define(CALL_TIMEOUT, 5000).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -305,10 +303,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
infinity).
notify_sent(QPid, ChPid) ->
- gen_server2:cast(QPid, {notify_sent, ChPid}).
+ gen_server2:pcast(QPid, 8, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
- gen_server2:cast(QPid, {unblock, ChPid}).
+ gen_server2:pcast(QPid, 8, {unblock, ChPid}).
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index cf0ef44f..fe2e8509 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -36,7 +36,8 @@
-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
--define(HIBERNATE_AFTER, 1000).
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
-export([start_link/1]).
@@ -101,7 +102,8 @@ init(Q) ->
next_msg_id = 1,
message_buffer = queue:new(),
active_consumers = queue:new(),
- blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}.
+ blocked_consumers = queue:new()}, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -116,9 +118,9 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}.
+reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}.
-noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
+noreply(NewState) -> {noreply, NewState, hibernate}.
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -813,11 +815,6 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_ch_down(DownPid, State);
-handle_info(timeout, State) ->
- %% TODO: Once we drop support for R11B-5, we can change this to
- %% {noreply, State, hibernate};
- proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]);
-
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 2dc619c1..4033aaaf 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -35,6 +35,7 @@
-export([publish/1, message/4, properties/1, delivery/4]).
-export([publish/4, publish/7]).
+-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -53,6 +54,8 @@
-spec(publish/7 :: (exchange_name(), routing_key(), bool(), bool(),
maybe(txn()), properties_input(), binary()) ->
publish_result()).
+-spec(build_content/2 :: (amqp_properties(), binary()) -> content()).
+-spec(from_content/1 :: (content()) -> {amqp_properties(), binary()}).
-endif.
@@ -72,16 +75,26 @@ delivery(Mandatory, Immediate, Txn, Message) ->
#delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
sender = self(), message = Message}.
+build_content(Properties, BodyBin) ->
+ {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ #content{class_id = ClassId,
+ properties = Properties,
+ properties_bin = none,
+ payload_fragments_rev = [BodyBin]}.
+
+from_content(Content) ->
+ #content{class_id = ClassId,
+ properties = Props,
+ payload_fragments_rev = FragmentsRev} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ {Props, list_to_binary(lists:reverse(FragmentsRev))}.
+
message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
Properties = properties(RawProperties),
- {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
- Content = #content{class_id = ClassId,
- properties = Properties,
- properties_bin = none,
- payload_fragments_rev = [BodyBin]},
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKeyBin,
- content = Content,
+ content = build_content(Properties, BodyBin),
persistent_key = none}.
properties(P = #'P_basic'{}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3089bb62..16b7c938 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -89,7 +89,7 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
conserve_memory(Pid, Conserve) ->
- gen_server2:cast(Pid, {conserve_memory, Conserve}).
+ gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}).
%%---------------------------------------------------------------------------
@@ -157,14 +157,16 @@ handle_cast({conserve_memory, Conserve}, State) ->
State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
noreply(State).
+handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
+ State = #ch{writer_pid = WriterPid}) ->
+ State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
+ {stop, normal, State};
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info(timeout, State) ->
ok = clear_permission_cache(),
- %% TODO: Once we drop support for R11B-5, we can change this to
- %% {noreply, State, hibernate};
- proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]).
+ {noreply, State, hibernate}.
terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid,
state = terminating}) ->
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 6649899a..37e4d189 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -36,7 +36,7 @@
-record(params, {quiet, node, command, args}).
--define(RPC_TIMEOUT, 30000).
+-define(RPC_TIMEOUT, infinity).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 2be00503..b789fbd1 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -42,6 +42,7 @@
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
+-define(SERIAL_FILENAME, "rabbit_serial").
-record(state, {serial}).
@@ -59,17 +60,28 @@
%%----------------------------------------------------------------------------
start_link() ->
- %% The persister can get heavily loaded, and we don't want that to
- %% impact guid generation. We therefore keep the serial in a
- %% separate process rather than calling rabbit_persister:serial/0
- %% directly in the functions below.
gen_server:start_link({local, ?SERVER}, ?MODULE,
- [rabbit_persister:serial()], []).
+ [update_disk_serial()], []).
+
+update_disk_serial() ->
+ Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME),
+ Serial = case rabbit_misc:read_term_file(Filename) of
+ {ok, [Num]} -> Num;
+ {error, enoent} -> rabbit_persister:serial();
+ {error, Reason} ->
+ throw({error, {cannot_read_serial_file, Filename, Reason}})
+ end,
+ case rabbit_misc:write_term_file(Filename, [Serial + 1]) of
+ ok -> ok;
+ {error, Reason1} ->
+ throw({error, {cannot_write_serial_file, Filename, Reason1}})
+ end,
+ Serial.
%% generate a guid that is monotonically increasing per process.
%%
%% The id is only unique within a single cluster and as long as the
-%% persistent message store hasn't been deleted.
+%% serial store hasn't been deleted.
guid() ->
%% We don't use erlang:now() here because a) it may return
%% duplicates when the system clock has been rewound prior to a
@@ -77,7 +89,7 @@ guid() ->
%% now() to move ahead of the system time), and b) it is really
%% slow since it takes a global lock and makes a system call.
%%
- %% rabbit_persister:serial/0, in combination with self/0 (which
+ %% A persisted serial number, in combination with self/0 (which
%% includes the node name) uniquely identifies a process in space
%% and time. We combine that with a process-local counter to give
%% us a GUID that is monotonically increasing per process.
diff --git a/src/rabbit_hooks.erl b/src/rabbit_hooks.erl
new file mode 100644
index 00000000..b3d271c2
--- /dev/null
+++ b/src/rabbit_hooks.erl
@@ -0,0 +1,73 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_hooks).
+
+-export([start/0]).
+-export([subscribe/3, unsubscribe/2, trigger/2, notify_remote/5]).
+
+-define(TableName, rabbit_hooks).
+
+-ifdef(use_specs).
+
+-spec(start/0 :: () -> 'ok').
+-spec(subscribe/3 :: (atom(), atom(), {atom(), atom(), list()}) -> 'ok').
+-spec(unsubscribe/2 :: (atom(), atom()) -> 'ok').
+-spec(trigger/2 :: (atom(), list()) -> 'ok').
+-spec(notify_remote/5 :: (atom(), atom(), list(), pid(), list()) -> 'ok').
+
+-endif.
+
+start() ->
+ ets:new(?TableName, [bag, public, named_table]),
+ ok.
+
+subscribe(Hook, HandlerName, Handler) ->
+ ets:insert(?TableName, {Hook, HandlerName, Handler}),
+ ok.
+
+unsubscribe(Hook, HandlerName) ->
+ ets:match_delete(?TableName, {Hook, HandlerName, '_'}),
+ ok.
+
+trigger(Hook, Args) ->
+ Hooks = ets:lookup(?TableName, Hook),
+ [case catch apply(M, F, [Hook, Name, Args | A]) of
+ {'EXIT', Reason} ->
+ rabbit_log:warning("Failed to execute handler ~p for hook ~p: ~p",
+ [Name, Hook, Reason]);
+ _ -> ok
+ end || {_, Name, {M, F, A}} <- Hooks],
+ ok.
+
+notify_remote(Hook, HandlerName, Args, Pid, PidArgs) ->
+ Pid ! {rabbitmq_hook, [Hook, HandlerName, Args | PidArgs]},
+ ok.
diff --git a/src/rabbit_memsup.erl b/src/rabbit_memsup.erl
new file mode 100644
index 00000000..b0d57cb2
--- /dev/null
+++ b/src/rabbit_memsup.erl
@@ -0,0 +1,142 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_memsup).
+
+-behaviour(gen_server).
+
+-export([start_link/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([update/0]).
+
+-record(state, {memory_fraction,
+ timeout,
+ timer,
+ mod,
+ mod_state,
+ alarmed
+ }).
+
+-define(SERVER, memsup). %% must be the same as the standard memsup
+
+-define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/1 :: (atom()) -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(update/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link(Args) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
+
+update() ->
+ gen_server:cast(?SERVER, update).
+
+%%----------------------------------------------------------------------------
+
+init([Mod]) ->
+ Fraction = os_mon:get_env(memsup, system_memory_high_watermark),
+ TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
+ InitState = Mod:init(),
+ State = #state { memory_fraction = Fraction,
+ timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
+ timer = TRef,
+ mod = Mod,
+ mod_state = InitState,
+ alarmed = false },
+ {ok, internal_update(State)}.
+
+start_timer(Timeout) ->
+ {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []),
+ TRef.
+
+%% Export the same API as the real memsup. Note that
+%% get_sysmem_high_watermark gives an int in the range 0 - 100, while
+%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0.
+handle_call(get_sysmem_high_watermark, _From, State) ->
+ {reply, trunc(100 * State#state.memory_fraction), State};
+
+handle_call({set_sysmem_high_watermark, Float}, _From, State) ->
+ {reply, ok, State#state{memory_fraction = Float}};
+
+handle_call(get_check_interval, _From, State) ->
+ {reply, State#state.timeout, State};
+
+handle_call({set_check_interval, Timeout}, _From, State) ->
+ {ok, cancel} = timer:cancel(State#state.timer),
+ {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
+
+handle_call(get_memory_data, _From,
+ State = #state { mod = Mod, mod_state = ModState }) ->
+ {reply, Mod:get_memory_data(ModState), State};
+
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+handle_cast(update, State) ->
+ {noreply, internal_update(State)};
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+internal_update(State = #state { memory_fraction = MemoryFraction,
+ alarmed = Alarmed,
+ mod = Mod, mod_state = ModState }) ->
+ ModState1 = Mod:update(ModState),
+ {MemTotal, MemUsed, _BigProc} = Mod:get_memory_data(ModState1),
+ NewAlarmed = MemUsed / MemTotal > MemoryFraction,
+ case {Alarmed, NewAlarmed} of
+ {false, true} ->
+ alarm_handler:set_alarm({system_memory_high_watermark, []});
+ {true, false} ->
+ alarm_handler:clear_alarm(system_memory_high_watermark);
+ _ ->
+ ok
+ end,
+ State #state { mod_state = ModState1, alarmed = NewAlarmed }.
diff --git a/src/rabbit_memsup_darwin.erl b/src/rabbit_memsup_darwin.erl
new file mode 100644
index 00000000..3de2d843
--- /dev/null
+++ b/src/rabbit_memsup_darwin.erl
@@ -0,0 +1,88 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_memsup_darwin).
+
+-export([init/0, update/1, get_memory_data/1]).
+
+-record(state, {total_memory,
+ allocated_memory}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()),
+ allocated_memory :: ('undefined' | non_neg_integer())
+ }).
+
+-spec(init/0 :: () -> state()).
+-spec(update/1 :: (state()) -> state()).
+-spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(),
+ ('undefined' | pid())}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+init() ->
+ #state{total_memory = undefined,
+ allocated_memory = undefined}.
+
+update(State) ->
+ File = os:cmd("/usr/bin/vm_stat"),
+ Lines = string:tokens(File, "\n"),
+ Dict = dict:from_list(lists:map(fun parse_line/1, Lines)),
+ [PageSize, Inactive, Active, Free, Wired] =
+ [dict:fetch(Key, Dict) ||
+ Key <- [page_size, 'Pages inactive', 'Pages active', 'Pages free',
+ 'Pages wired down']],
+ MemTotal = PageSize * (Inactive + Active + Free + Wired),
+ MemUsed = PageSize * (Active + Wired),
+ State#state{total_memory = MemTotal, allocated_memory = MemUsed}.
+
+get_memory_data(State) ->
+ {State#state.total_memory, State#state.allocated_memory, undefined}.
+
+%%----------------------------------------------------------------------------
+
+%% A line looks like "Foo bar: 123456."
+parse_line(Line) ->
+ [Name, RHS | _Rest] = string:tokens(Line, ":"),
+ case Name of
+ "Mach Virtual Memory Statistics" ->
+ ["(page", "size", "of", PageSize, "bytes)"] =
+ string:tokens(RHS, " "),
+ {page_size, list_to_integer(PageSize)};
+ _ ->
+ [Value | _Rest1] = string:tokens(RHS, " ."),
+ {list_to_atom(Name), list_to_integer(Value)}
+ end.
diff --git a/src/rabbit_memsup_linux.erl b/src/rabbit_memsup_linux.erl
index ffdc7e99..ca942d7c 100644
--- a/src/rabbit_memsup_linux.erl
+++ b/src/rabbit_memsup_linux.erl
@@ -31,104 +31,44 @@
-module(rabbit_memsup_linux).
--behaviour(gen_server).
+-export([init/0, update/1, get_memory_data/1]).
--export([start_link/0]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--export([update/0]).
-
--define(SERVER, memsup). %% must be the same as the standard memsup
-
--define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000).
-
--record(state, {memory_fraction, alarmed, timeout, timer}).
+-record(state, {total_memory,
+ allocated_memory}).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
--spec(update/0 :: () -> 'ok').
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+-type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()),
+ allocated_memory :: ('undefined' | non_neg_integer())
+ }).
+-spec(init/0 :: () -> state()).
+-spec(update/1 :: (state()) -> state()).
+-spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(),
+ ('undefined' | pid())}).
-update() ->
- gen_server:cast(?SERVER, update).
+-endif.
%%----------------------------------------------------------------------------
-init(_Args) ->
- Fraction = os_mon:get_env(memsup, system_memory_high_watermark),
- TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
- {ok, #state{alarmed = false,
- memory_fraction = Fraction,
- timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
- timer = TRef}}.
-
-start_timer(Timeout) ->
- {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []),
- TRef.
-
-%% Export the same API as the real memsup. Note that
-%% get_sysmem_high_watermark gives an int in the range 0 - 100, while
-%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0.
-handle_call(get_sysmem_high_watermark, _From, State) ->
- {reply, trunc(100 * State#state.memory_fraction), State};
-
-handle_call({set_sysmem_high_watermark, Float}, _From, State) ->
- {reply, ok, State#state{memory_fraction = Float}};
+init() ->
+ #state{total_memory = undefined,
+ allocated_memory = undefined}.
-handle_call(get_check_interval, _From, State) ->
- {reply, State#state.timeout, State};
-
-handle_call({set_check_interval, Timeout}, _From, State) ->
- {ok, cancel} = timer:cancel(State#state.timer),
- {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
-
-handle_call(_Request, _From, State) ->
- {noreply, State}.
-
-handle_cast(update, State = #state{alarmed = Alarmed,
- memory_fraction = MemoryFraction}) ->
+update(State) ->
File = read_proc_file("/proc/meminfo"),
Lines = string:tokens(File, "\n"),
Dict = dict:from_list(lists:map(fun parse_line/1, Lines)),
- MemTotal = dict:fetch('MemTotal', Dict),
- MemUsed = MemTotal
- - dict:fetch('MemFree', Dict)
- - dict:fetch('Buffers', Dict)
- - dict:fetch('Cached', Dict),
- NewAlarmed = MemUsed / MemTotal > MemoryFraction,
- case {Alarmed, NewAlarmed} of
- {false, true} ->
- alarm_handler:set_alarm({system_memory_high_watermark, []});
- {true, false} ->
- alarm_handler:clear_alarm(system_memory_high_watermark);
- _ ->
- ok
- end,
- {noreply, State#state{alarmed = NewAlarmed}};
-
-handle_cast(_Request, State) ->
- {noreply, State}.
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
+ [MemTotal, MemFree, Buffers, Cached] =
+ [dict:fetch(Key, Dict) ||
+ Key <- ['MemTotal', 'MemFree', 'Buffers', 'Cached']],
+ MemUsed = MemTotal - MemFree - Buffers - Cached,
+ State#state{total_memory = MemTotal, allocated_memory = MemUsed}.
+
+get_memory_data(State) ->
+ {State#state.total_memory, State#state.allocated_memory, undefined}.
%%----------------------------------------------------------------------------
@@ -152,5 +92,10 @@ read_proc_file(IoDevice, Acc) ->
%% A line looks like "FooBar: 123456 kB"
parse_line(Line) ->
- [Name, Value | _] = string:tokens(Line, ": "),
- {list_to_atom(Name), list_to_integer(Value)}.
+ [Name, RHS | _Rest] = string:tokens(Line, ":"),
+ [Value | UnitsRest] = string:tokens(RHS, " "),
+ Value1 = case UnitsRest of
+ [] -> list_to_integer(Value); %% no units
+ ["kB"] -> list_to_integer(Value) * 1024
+ end,
+ {list_to_atom(Name), Value1}.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 72e16f0f..95a274e3 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -41,6 +41,7 @@
-export([dirty_read/1]).
-export([r/3, r/2, r_arg/4, rs/1]).
-export([enable_cover/0, report_cover/0]).
+-export([enable_cover/1, report_cover/1]).
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
@@ -49,9 +50,11 @@
-export([intersperse/2, upmap/2, map_in_order/2]).
-export([table_foreach/2]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
+-export([read_term_file/1, write_term_file/2]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
+-export([unfold/2, ceil/1]).
-import(mnesia).
-import(lists).
@@ -64,6 +67,8 @@
-include_lib("kernel/include/inet.hrl").
+-type(ok_or_error() :: 'ok' | {'error', any()}).
+
-spec(method_record_type/1 :: (tuple()) -> atom()).
-spec(polite_pause/0 :: () -> 'done').
-spec(polite_pause/1 :: (non_neg_integer()) -> 'done').
@@ -87,8 +92,10 @@
-spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) ->
undefined | r(K) when is_subtype(K, atom())).
-spec(rs/1 :: (r(atom())) -> string()).
--spec(enable_cover/0 :: () -> 'ok' | {'error', any()}).
+-spec(enable_cover/0 :: () -> ok_or_error()).
-spec(report_cover/0 :: () -> 'ok').
+-spec(enable_cover/1 :: (string()) -> ok_or_error()).
+-spec(report_cover/1 :: (string()) -> 'ok').
-spec(throw_on_error/2 ::
(atom(), thunk({error, any()} | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
@@ -97,7 +104,7 @@
-spec(with_vhost/2 :: (vhost(), thunk(A)) -> A).
-spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A).
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
--spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok').
+-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
-spec(localnode/1 :: (atom()) -> erlang_node()).
-spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()).
-spec(intersperse/2 :: (A, [A]) -> [A]).
@@ -107,12 +114,16 @@
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
'ok' | 'aborted').
--spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}).
--spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}).
+-spec(dirty_dump_log/1 :: (string()) -> ok_or_error()).
+-spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}).
+-spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()).
+-spec(append_file/2 :: (string(), string()) -> ok_or_error()).
-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(start_applications/1 :: ([atom()]) -> 'ok').
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
+-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
+-spec(ceil/1 :: (number()) -> number()).
-endif.
@@ -188,17 +199,27 @@ rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) ->
[Kind, Name, VHostPath])).
enable_cover() ->
- case cover:compile_beam_directory("ebin") of
+ enable_cover(".").
+
+enable_cover([Root]) when is_atom(Root) ->
+ enable_cover(atom_to_list(Root));
+enable_cover(Root) ->
+ case cover:compile_beam_directory(filename:join(Root, "ebin")) of
{error,Reason} -> {error,Reason};
_ -> ok
end.
report_cover() ->
- Dir = "cover/",
- ok = filelib:ensure_dir(Dir),
+ report_cover(".").
+
+report_cover([Root]) when is_atom(Root) ->
+ report_cover(atom_to_list(Root));
+report_cover(Root) ->
+ Dir = filename:join(Root, "cover"),
+ ok = filelib:ensure_dir(filename:join(Dir,"junk")),
lists:foreach(fun(F) -> file:delete(F) end,
- filelib:wildcard(Dir ++ "*.html")),
- {ok, SummaryFile} = file:open(Dir ++ "summary.txt", [write]),
+ filelib:wildcard(filename:join(Dir, "*.html"))),
+ {ok, SummaryFile} = file:open(filename:join(Dir, "summary.txt"), [write]),
{CT, NCT} =
lists:foldl(
fun(M,{CovTot, NotCovTot}) ->
@@ -207,7 +228,7 @@ report_cover() ->
Cov, NotCov, M),
{ok,_} = cover:analyze_to_file(
M,
- Dir ++ atom_to_list(M) ++ ".html",
+ filename:join(Dir, atom_to_list(M) ++ ".html"),
[html]),
{CovTot+Cov, NotCovTot+NotCov}
end,
@@ -347,7 +368,9 @@ dirty_foreach_key1(F, TableName, K) ->
end.
dirty_dump_log(FileName) ->
- {ok, LH} = disk_log:open([{name, dirty_dump_log}, {mode, read_only}, {file, FileName}]),
+ {ok, LH} = disk_log:open([{name, dirty_dump_log},
+ {mode, read_only},
+ {file, FileName}]),
dirty_dump_log1(LH, disk_log:chunk(LH, start)),
disk_log:close(LH).
@@ -361,6 +384,12 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) ->
dirty_dump_log1(LH, disk_log:chunk(LH, K)).
+read_term_file(File) -> file:consult(File).
+
+write_term_file(File, Terms) ->
+ file:write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) ||
+ Term <- Terms])).
+
append_file(File, Suffix) ->
case file:read_file_info(File) of
{ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix);
@@ -431,3 +460,18 @@ stop_applications(Apps) ->
cannot_stop_application,
Apps).
+unfold(Fun, Init) ->
+ unfold(Fun, [], Init).
+
+unfold(Fun, Acc, Init) ->
+ case Fun(Init) of
+ {true, E, I} -> unfold(Fun, [E|Acc], I);
+ false -> {Acc, Init}
+ end.
+
+ceil(N) ->
+ T = trunc(N),
+ case N - T of
+ 0 -> N;
+ _ -> 1 + T
+ end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 575ecb0a..37e20335 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -149,6 +149,11 @@ table_definitions() ->
table_names() ->
[Tab || {Tab, _} <- table_definitions()].
+replicated_table_names() ->
+ [Tab || {Tab, Attrs} <- table_definitions(),
+ not lists:member({local_content, true}, Attrs)
+ ].
+
dir() -> mnesia:system_info(directory).
ensure_mnesia_dir() ->
@@ -192,28 +197,16 @@ cluster_nodes_config_filename() ->
create_cluster_nodes_config(ClusterNodes) ->
FileName = cluster_nodes_config_filename(),
- Handle = case file:open(FileName, [write]) of
- {ok, Device} -> Device;
- {error, Reason} ->
- throw({error, {cannot_create_cluster_nodes_config,
- FileName, Reason}})
- end,
- try
- ok = io:write(Handle, ClusterNodes),
- ok = io:put_chars(Handle, [$.])
- after
- case file:close(Handle) of
- ok -> ok;
- {error, Reason1} ->
- throw({error, {cannot_close_cluster_nodes_config,
- FileName, Reason1}})
- end
- end,
- ok.
+ case rabbit_misc:write_term_file(FileName, [ClusterNodes]) of
+ ok -> ok;
+ {error, Reason} ->
+ throw({error, {cannot_create_cluster_nodes_config,
+ FileName, Reason}})
+ end.
read_cluster_nodes_config() ->
FileName = cluster_nodes_config_filename(),
- case file:consult(FileName) of
+ case rabbit_misc:read_term_file(FileName) of
{ok, [ClusterNodes]} -> ClusterNodes;
{error, enoent} ->
case application:get_env(cluster_config) of
@@ -250,12 +243,10 @@ delete_cluster_nodes_config() ->
%% standalone disk node, or disk or ram node connected to the
%% specified cluster nodes.
init_db(ClusterNodes) ->
- WasDiskNode = mnesia:system_info(use_dir),
- IsDiskNode = ClusterNodes == [] orelse
- lists:member(node(), ClusterNodes),
case mnesia:change_config(extra_db_nodes, ClusterNodes -- [node()]) of
{ok, []} ->
- if WasDiskNode and IsDiskNode ->
+ case mnesia:system_info(use_dir) of
+ true ->
case check_schema_integrity() of
ok ->
ok;
@@ -270,22 +261,18 @@ init_db(ClusterNodes) ->
ok = move_db(),
ok = create_schema()
end;
- WasDiskNode ->
- throw({error, {cannot_convert_disk_node_to_ram_node,
- ClusterNodes}});
- IsDiskNode ->
- ok = create_schema();
- true ->
- throw({error, {unable_to_contact_cluster_nodes,
- ClusterNodes}})
+ false ->
+ ok = create_schema()
end;
{ok, [_|_]} ->
- ok = wait_for_tables(),
- ok = create_local_table_copies(
- case IsDiskNode of
- true -> disc;
- false -> ram
- end);
+ IsDiskNode = ClusterNodes == [] orelse
+ lists:member(node(), ClusterNodes),
+ ok = wait_for_replicated_tables(),
+ ok = create_local_table_copy(schema, disc_copies),
+ ok = create_local_table_copies(case IsDiskNode of
+ true -> disc;
+ false -> ram
+ end);
{error, Reason} ->
%% one reason we may end up here is if we try to join
%% nodes together that are currently running standalone or
@@ -336,40 +323,36 @@ create_tables() ->
table_definitions()),
ok.
+table_has_copy_type(TabDef, DiscType) ->
+ lists:member(node(), proplists:get_value(DiscType, TabDef, [])).
+
create_local_table_copies(Type) ->
- ok = if Type /= ram -> create_local_table_copy(schema, disc_copies);
- true -> ok
- end,
lists:foreach(
fun({Tab, TabDef}) ->
- HasDiscCopies =
- lists:keymember(disc_copies, 1, TabDef),
- HasDiscOnlyCopies =
- lists:keymember(disc_only_copies, 1, TabDef),
+ HasDiscCopies = table_has_copy_type(TabDef, disc_copies),
+ HasDiscOnlyCopies = table_has_copy_type(TabDef, disc_only_copies),
+ LocalTab = proplists:get_bool(local_content, TabDef),
StorageType =
- case Type of
- disc ->
+ if
+ Type =:= disc orelse LocalTab ->
if
- HasDiscCopies -> disc_copies;
+ HasDiscCopies -> disc_copies;
HasDiscOnlyCopies -> disc_only_copies;
- true -> ram_copies
+ true -> ram_copies
end;
%% unused code - commented out to keep dialyzer happy
-%% disc_only ->
+%% Type =:= disc_only ->
%% if
%% HasDiscCopies or HasDiscOnlyCopies ->
%% disc_only_copies;
%% true -> ram_copies
%% end;
- ram ->
+ Type =:= ram ->
ram_copies
end,
ok = create_local_table_copy(Tab, StorageType)
end,
table_definitions()),
- ok = if Type == ram -> create_local_table_copy(schema, ram_copies);
- true -> ok
- end,
ok.
create_local_table_copy(Tab, Type) ->
@@ -384,10 +367,14 @@ create_local_table_copy(Tab, Type) ->
end,
ok.
-wait_for_tables() ->
+wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
+
+wait_for_tables() -> wait_for_tables(table_names()).
+
+wait_for_tables(TableNames) ->
case check_schema_integrity() of
ok ->
- case mnesia:wait_for_tables(table_names(), 30000) of
+ case mnesia:wait_for_tables(TableNames, 30000) of
ok -> ok;
{timeout, BadTabs} ->
throw({error, {timeout_waiting_for_tables, BadTabs}});
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
new file mode 100644
index 00000000..71278bfb
--- /dev/null
+++ b/src/rabbit_plugin_activator.erl
@@ -0,0 +1,198 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_plugin_activator).
+
+-export([start/0, stop/0]).
+
+-define(DefaultPluginDir, "plugins").
+-define(DefaultUnpackedPluginDir, "priv/plugins").
+-define(DefaultRabbitEBin, "ebin").
+-define(BaseApps, [rabbit]).
+
+%%----------------------------------------------------------------------------
+
+start() ->
+ %% Ensure Rabbit is loaded so we can access it's environment
+ application:load(rabbit),
+
+ %% Determine our various directories
+ PluginDir = get_env(plugins_dir, ?DefaultPluginDir),
+ UnpackedPluginDir = get_env(plugins_expand_dir, ?DefaultUnpackedPluginDir),
+ RabbitEBin = get_env(rabbit_ebin, ?DefaultRabbitEBin),
+
+ %% Unpack any .ez plugins
+ unpack_ez_plugins(PluginDir, UnpackedPluginDir),
+
+ %% Build a list of required apps based on the fixed set, and any plugins
+ RequiredApps = ?BaseApps ++
+ find_plugins(PluginDir) ++
+ find_plugins(UnpackedPluginDir),
+
+ %% Build the entire set of dependencies - this will load the
+ %% applications along the way
+ AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of
+ {unknown_app, {App, Err}} ->
+ io:format("ERROR: Failed to load application " ++
+ "~s: ~p~n", [App, Err]),
+ halt(1);
+ AppList ->
+ AppList
+ end,
+ AppVersions = [determine_version(App) || App <- AllApps],
+ {value, {rabbit, RabbitVersion}} = lists:keysearch(rabbit, 1, AppVersions),
+
+ %% Build the overall release descriptor
+ RDesc = {release,
+ {"rabbit", RabbitVersion},
+ {erts, erlang:system_info(version)},
+ AppVersions},
+
+ %% Write it out to ebin/rabbit.rel
+ file:write_file(RabbitEBin ++ "/rabbit.rel",
+ io_lib:format("~p.~n", [RDesc])),
+
+ %% Compile the script
+ case systools:make_script(RabbitEBin ++ "/rabbit", [local, silent]) of
+ {ok, Module, Warnings} ->
+ %% This gets lots of spurious no-source warnings when we
+ %% have .ez files, so we want to supress them to prevent
+ %% hiding real issues.
+ WarningStr = Module:format_warning(
+ [W || W <- Warnings,
+ case W of
+ {warning, {source_not_found, _}} -> false;
+ _ -> true
+ end]),
+ case length(WarningStr) of
+ 0 -> ok;
+ _ -> io:format("~s", [WarningStr])
+ end,
+ ok;
+ {error, Module, Error} ->
+ io:format("Boot file generation failed: ~s~n",
+ [Module:format_error(Error)]),
+ halt(1)
+ end,
+ halt(),
+ ok.
+
+stop() ->
+ ok.
+
+get_env(Key, Default) ->
+ case application:get_env(rabbit, Key) of
+ {ok, V} -> V;
+ _ -> Default
+ end.
+
+determine_version(App) ->
+ application:load(App),
+ {ok, Vsn} = application:get_key(App, vsn),
+ {App, Vsn}.
+
+assert_dir(Dir) ->
+ case filelib:is_dir(Dir) of
+ true -> ok;
+ false ->
+ ok = filelib:ensure_dir(Dir),
+ ok = file:make_dir(Dir)
+ end.
+delete_dir(Dir) ->
+ case filelib:is_dir(Dir) of
+ true ->
+ case file:list_dir(Dir) of
+ {ok, Files} ->
+ [case Dir ++ "/" ++ F of
+ Fn ->
+ case filelib:is_dir(Fn) and not(is_symlink(Fn)) of
+ true -> delete_dir(Fn);
+ false -> file:delete(Fn)
+ end
+ end || F <- Files]
+ end,
+ ok = file:del_dir(Dir);
+ false ->
+ ok
+ end.
+is_symlink(Name) ->
+ case file:read_link(Name) of
+ {ok, _} -> true;
+ _ -> false
+ end.
+
+unpack_ez_plugins(PluginSrcDir, PluginDestDir) ->
+ %% Eliminate the contents of the destination directory
+ delete_dir(PluginDestDir),
+
+ assert_dir(PluginDestDir),
+ [unpack_ez_plugin(PluginName, PluginDestDir) ||
+ PluginName <- filelib:wildcard(PluginSrcDir ++ "/*.ez")].
+
+unpack_ez_plugin(PluginFn, PluginDestDir) ->
+ zip:unzip(PluginFn, [{cwd, PluginDestDir}]),
+ ok.
+
+find_plugins(PluginDir) ->
+ [prepare_dir_plugin(PluginName) ||
+ PluginName <- filelib:wildcard(PluginDir ++ "/*/ebin/*.app")].
+
+prepare_dir_plugin(PluginAppDescFn) ->
+ %% Add the plugin ebin directory to the load path
+ PluginEBinDirN = filename:dirname(PluginAppDescFn),
+ code:add_path(PluginEBinDirN),
+
+ %% We want the second-last token
+ NameTokens = string:tokens(PluginAppDescFn,"/."),
+ PluginNameString = lists:nth(length(NameTokens) - 1, NameTokens),
+ list_to_atom(PluginNameString).
+
+expand_dependencies(Pending) ->
+ expand_dependencies(sets:new(), Pending).
+expand_dependencies(Current, []) ->
+ Current;
+expand_dependencies(Current, [Next|Rest]) ->
+ case sets:is_element(Next, Current) of
+ true ->
+ expand_dependencies(Current, Rest);
+ false ->
+ case application:load(Next) of
+ ok ->
+ ok;
+ {error, {already_loaded, _}} ->
+ ok;
+ X ->
+ throw({unknown_app, {Next, X}})
+ end,
+ {ok, Required} = application:get_key(Next, applications),
+ Unique = [A || A <- Required, not(sets:is_element(A, Current))],
+ expand_dependencies(sets:add_element(Next, Current), Rest ++ Unique)
+ end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index a09783be..69dbc008 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -286,7 +286,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
%% since this termination is initiated by our parent it is
%% probably more important to exit quickly.
exit(Reason);
- {'EXIT', _Pid, E = {writer, send_failed, _Error}} ->
+ {channel_exit, _Chan, E = {writer, send_failed, _Error}} ->
throw(E);
{channel_exit, Channel, Reason} ->
mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State));
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 01757509..b4cd30bc 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -33,6 +33,9 @@
-export([all_tests/0, test_parsing/0]).
+%% Exported so the hook mechanism can call back
+-export([handle_hook/3, bad_handle_hook/3, extra_arg_hook/5]).
+
-import(lists).
-include("rabbit.hrl").
@@ -46,6 +49,7 @@ test_content_prop_roundtrip(Datum, Binary) ->
all_tests() ->
passed = test_priority_queue(),
+ passed = test_unfold(),
passed = test_parsing(),
passed = test_topic_matching(),
passed = test_log_management(),
@@ -54,6 +58,7 @@ all_tests() ->
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_server_status(),
+ passed = test_hooks(),
passed.
test_priority_queue() ->
@@ -71,7 +76,8 @@ test_priority_queue() ->
%% 1-element priority Q
Q1 = priority_queue:in(foo, 1, priority_queue:new()),
- {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1),
+ {true, false, 1, [{1, foo}], [foo]} =
+ test_priority_queue(Q1),
%% 2-element same-priority Q
Q2 = priority_queue:in(bar, 1, Q1),
@@ -87,6 +93,71 @@ test_priority_queue() ->
Q4 = priority_queue:in(foo, -1, priority_queue:new()),
{true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4),
+ %% merge 2 * 1-element no-priority Qs
+ Q5 = priority_queue:join(priority_queue:in(foo, Q),
+ priority_queue:in(bar, Q)),
+ {true, false, 2, [{0, foo}, {0, bar}], [foo, bar]} =
+ test_priority_queue(Q5),
+
+ %% merge 1-element no-priority Q with 1-element priority Q
+ Q6 = priority_queue:join(priority_queue:in(foo, Q),
+ priority_queue:in(bar, 1, Q)),
+ {true, false, 2, [{1, bar}, {0, foo}], [bar, foo]} =
+ test_priority_queue(Q6),
+
+ %% merge 1-element priority Q with 1-element no-priority Q
+ Q7 = priority_queue:join(priority_queue:in(foo, 1, Q),
+ priority_queue:in(bar, Q)),
+ {true, false, 2, [{1, foo}, {0, bar}], [foo, bar]} =
+ test_priority_queue(Q7),
+
+ %% merge 2 * 1-element same-priority Qs
+ Q8 = priority_queue:join(priority_queue:in(foo, 1, Q),
+ priority_queue:in(bar, 1, Q)),
+ {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} =
+ test_priority_queue(Q8),
+
+ %% merge 2 * 1-element different-priority Qs
+ Q9 = priority_queue:join(priority_queue:in(foo, 1, Q),
+ priority_queue:in(bar, 2, Q)),
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} =
+ test_priority_queue(Q9),
+
+ %% merge 2 * 1-element different-priority Qs (other way around)
+ Q10 = priority_queue:join(priority_queue:in(bar, 2, Q),
+ priority_queue:in(foo, 1, Q)),
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} =
+ test_priority_queue(Q10),
+
+ %% merge 2 * 2-element multi-different-priority Qs
+ Q11 = priority_queue:join(Q6, Q5),
+ {true, false, 4, [{1, bar}, {0, foo}, {0, foo}, {0, bar}],
+ [bar, foo, foo, bar]} = test_priority_queue(Q11),
+
+ %% and the other way around
+ Q12 = priority_queue:join(Q5, Q6),
+ {true, false, 4, [{1, bar}, {0, foo}, {0, bar}, {0, foo}],
+ [bar, foo, bar, foo]} = test_priority_queue(Q12),
+
+ %% merge with negative priorities
+ Q13 = priority_queue:join(Q4, Q5),
+ {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} =
+ test_priority_queue(Q13),
+
+ %% and the other way around
+ Q14 = priority_queue:join(Q5, Q4),
+ {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} =
+ test_priority_queue(Q14),
+
+ %% joins with empty queues:
+ Q1 = priority_queue:join(Q, Q1),
+ Q1 = priority_queue:join(Q1, Q),
+
+ %% insert with priority into non-empty zero-priority queue
+ Q15 = priority_queue:in(baz, 1, Q5),
+ {true, false, 3, [{1, baz}, {0, foo}, {0, bar}], [baz, foo, bar]} =
+ test_priority_queue(Q15),
+
passed.
priority_queue_in_all(Q, L) ->
@@ -112,6 +183,14 @@ test_simple_n_element_queue(N) ->
{true, false, N, ToListRes, Items} = test_priority_queue(Q),
passed.
+test_unfold() ->
+ {[], test} = rabbit_misc:unfold(fun (_V) -> false end, test),
+ List = lists:seq(2,20,2),
+ {List, 0} = rabbit_misc:unfold(fun (0) -> false;
+ (N) -> {true, N*2, N-1}
+ end, 10),
+ passed.
+
test_parsing() ->
passed = test_content_properties(),
passed.
@@ -404,19 +483,17 @@ test_cluster_management() ->
end,
ClusteringSequence),
- %% attempt to convert a disk node into a ram node
+ %% convert a disk node into a ram node
ok = control_action(reset, []),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
- {error, {cannot_convert_disk_node_to_ram_node, _}} =
- control_action(cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
+ ok = control_action(cluster, ["invalid1@invalid",
+ "invalid2@invalid"]),
- %% attempt to join a non-existing cluster as a ram node
+ %% join a non-existing cluster as a ram node
ok = control_action(reset, []),
- {error, {unable_to_contact_cluster_nodes, _}} =
- control_action(cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
+ ok = control_action(cluster, ["invalid1@invalid",
+ "invalid2@invalid"]),
SecondaryNode = rabbit_misc:localnode(hare),
case net_adm:ping(SecondaryNode) of
@@ -432,11 +509,12 @@ test_cluster_management2(SecondaryNode) ->
NodeS = atom_to_list(node()),
SecondaryNodeS = atom_to_list(SecondaryNode),
- %% attempt to convert a disk node into a ram node
+ %% make a disk node
ok = control_action(reset, []),
ok = control_action(cluster, [NodeS]),
- {error, {unable_to_join_cluster, _, _}} =
- control_action(cluster, [SecondaryNodeS]),
+ %% make a ram node
+ ok = control_action(reset, []),
+ ok = control_action(cluster, [SecondaryNodeS]),
%% join cluster as a ram node
ok = control_action(reset, []),
@@ -449,21 +527,21 @@ test_cluster_management2(SecondaryNode) ->
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
- %% attempt to join non-existing cluster as a ram node
- {error, _} = control_action(cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
-
+ %% join non-existing cluster as a ram node
+ ok = control_action(cluster, ["invalid1@invalid",
+ "invalid2@invalid"]),
%% turn ram node into disk node
+ ok = control_action(reset, []),
ok = control_action(cluster, [SecondaryNodeS, NodeS]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
- %% attempt to convert a disk node into a ram node
- {error, {cannot_convert_disk_node_to_ram_node, _}} =
- control_action(cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
+ %% convert a disk node into a ram node
+ ok = control_action(cluster, ["invalid1@invalid",
+ "invalid2@invalid"]),
%% turn a disk node into a ram node
+ ok = control_action(reset, []),
ok = control_action(cluster, [SecondaryNodeS]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
@@ -601,6 +679,52 @@ test_server_status() ->
passed.
+test_hooks() ->
+ %% Firing of hooks calls all hooks in an isolated manner
+ rabbit_hooks:subscribe(test_hook, test, {rabbit_tests, handle_hook, []}),
+ rabbit_hooks:subscribe(test_hook, test2, {rabbit_tests, handle_hook, []}),
+ rabbit_hooks:subscribe(test_hook2, test2, {rabbit_tests, handle_hook, []}),
+ rabbit_hooks:trigger(test_hook, [arg1, arg2]),
+ [arg1, arg2] = get(test_hook_test_fired),
+ [arg1, arg2] = get(test_hook_test2_fired),
+ undefined = get(test_hook2_test2_fired),
+
+ %% Hook Deletion works
+ put(test_hook_test_fired, undefined),
+ put(test_hook_test2_fired, undefined),
+ rabbit_hooks:unsubscribe(test_hook, test),
+ rabbit_hooks:trigger(test_hook, [arg3, arg4]),
+ undefined = get(test_hook_test_fired),
+ [arg3, arg4] = get(test_hook_test2_fired),
+ undefined = get(test_hook2_test2_fired),
+
+ %% Catches exceptions from bad hooks
+ rabbit_hooks:subscribe(test_hook3, test, {rabbit_tests, bad_handle_hook, []}),
+ ok = rabbit_hooks:trigger(test_hook3, []),
+
+ %% Passing extra arguments to hooks
+ rabbit_hooks:subscribe(arg_hook, test, {rabbit_tests, extra_arg_hook, [1, 3]}),
+ rabbit_hooks:trigger(arg_hook, [arg1, arg2]),
+ {[arg1, arg2], 1, 3} = get(arg_hook_test_fired),
+
+ %% Invoking Pids
+ Remote = fun() ->
+ receive
+ {rabbitmq_hook,[remote_test,test,[],Target]} ->
+ Target ! invoked
+ end
+ end,
+ P = spawn(Remote),
+ rabbit_hooks:subscribe(remote_test, test, {rabbit_hooks, notify_remote, [P, [self()]]}),
+ rabbit_hooks:trigger(remote_test, []),
+ receive
+ invoked -> ok
+ after 100 ->
+ io:format("Remote hook not invoked"),
+ throw(timeout)
+ end,
+ passed.
+
%---------------------------------------------------------------------
control_action(Command, Args) -> control_action(Command, node(), Args).
@@ -684,3 +808,11 @@ delete_log_handlers(Handlers) ->
[[] = error_logger:delete_report_handler(Handler) ||
Handler <- Handlers],
ok.
+
+handle_hook(HookName, Handler, Args) ->
+ A = atom_to_list(HookName) ++ "_" ++ atom_to_list(Handler) ++ "_fired",
+ put(list_to_atom(A), Args).
+bad_handle_hook(_, _, _) ->
+ bad:bad().
+extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) ->
+ handle_hook(Hookname, Handler, {Args, Extra1, Extra2}).
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 5ca294b7..1679ce7c 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -33,9 +33,9 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/3, shutdown/1, mainloop/1]).
--export([send_command/2, send_command/3,
- send_command_and_notify/5]).
+-export([start/3, start_link/3, shutdown/1, mainloop/1]).
+-export([send_command/2, send_command/3, send_command_and_signal_back/3,
+ send_command_and_signal_back/4, send_command_and_notify/5]).
-export([internal_send_command/3, internal_send_command/5]).
-import(gen_tcp).
@@ -49,8 +49,12 @@
-ifdef(use_specs).
-spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()).
+-spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()).
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
-spec(send_command/3 :: (pid(), amqp_method(), content()) -> 'ok').
+-spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok').
+-spec(send_command_and_signal_back/4 ::
+ (pid(), amqp_method(), content(), pid()) -> 'ok').
-spec(send_command_and_notify/5 ::
(pid(), pid(), pid(), amqp_method(), content()) -> 'ok').
-spec(internal_send_command/3 ::
@@ -68,6 +72,11 @@ start(Sock, Channel, FrameMax) ->
channel = Channel,
frame_max = FrameMax}]).
+start_link(Sock, Channel, FrameMax) ->
+ spawn_link(?MODULE, mainloop, [#wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax}]).
+
mainloop(State) ->
receive
Message -> ?MODULE:mainloop(handle_message(Message, State))
@@ -86,6 +95,19 @@ handle_message({send_command, MethodRecord, Content},
ok = internal_send_command_async(Sock, Channel, MethodRecord,
Content, FrameMax),
State;
+handle_message({send_command_and_signal_back, MethodRecord, Parent},
+ State = #wstate{sock = Sock, channel = Channel}) ->
+ ok = internal_send_command_async(Sock, Channel, MethodRecord),
+ Parent ! rabbit_writer_send_command_signal,
+ State;
+handle_message({send_command_and_signal_back, MethodRecord, Content, Parent},
+ State = #wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax}) ->
+ ok = internal_send_command_async(Sock, Channel, MethodRecord,
+ Content, FrameMax),
+ Parent ! rabbit_writer_send_command_signal,
+ State;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
State = #wstate{sock = Sock,
channel = Channel,
@@ -113,6 +135,14 @@ send_command(W, MethodRecord, Content) ->
W ! {send_command, MethodRecord, Content},
ok.
+send_command_and_signal_back(W, MethodRecord, Parent) ->
+ W ! {send_command_and_signal_back, MethodRecord, Parent},
+ ok.
+
+send_command_and_signal_back(W, MethodRecord, Content, Parent) ->
+ W ! {send_command_and_signal_back, MethodRecord, Content, Parent},
+ ok.
+
send_command_and_notify(W, Q, ChPid, MethodRecord, Content) ->
W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content},
ok.