summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSteve Powell <steve@rabbitmq.com>2012-02-22 11:13:28 +0000
committerSteve Powell <steve@rabbitmq.com>2012-02-22 11:13:28 +0000
commit0227f7f00ae7c05402d3214c8d9639ffa4f2c625 (patch)
treec2bc721e955ac985c9c1bef3a7799be8f4de999d
parent35eaf39946377976c94ca067dee453424e83d239 (diff)
parent6d6d9d8341247e636476217844a86df6e30779ef (diff)
downloadrabbitmq-server-0227f7f00ae7c05402d3214c8d9639ffa4f2c625.tar.gz
Merge default in
-rw-r--r--LICENSE-MPL-RabbitMQ2
-rw-r--r--Makefile4
-rw-r--r--codegen.py4
-rw-r--r--docs/html-to-website-xml.xsl44
-rw-r--r--docs/rabbitmqctl.1.xml30
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--include/gm_specs.hrl2
-rw-r--r--include/rabbit.hrl15
-rw-r--r--include/rabbit_auth_backend_spec.hrl2
-rw-r--r--include/rabbit_auth_mechanism_spec.hrl2
-rw-r--r--include/rabbit_backing_queue_spec.hrl2
-rw-r--r--include/rabbit_exchange_type_spec.hrl2
-rw-r--r--include/rabbit_msg_store.hrl2
-rw-r--r--include/rabbit_msg_store_index.hrl2
-rw-r--r--packaging/common/LICENSE.tail4
-rw-r--r--packaging/common/rabbitmq-script-wrapper2
-rwxr-xr-xpackaging/common/rabbitmq-server.ocf2
-rw-r--r--packaging/debs/Debian/Makefile2
-rw-r--r--packaging/debs/Debian/debian/control2
-rw-r--r--packaging/windows-exe/rabbitmq_nsi.in2
-rw-r--r--packaging/windows/Makefile3
-rwxr-xr-xscripts/rabbitmq-env2
-rwxr-xr-xscripts/rabbitmq-plugins2
-rwxr-xr-xscripts/rabbitmq-plugins.bat2
-rwxr-xr-xscripts/rabbitmq-server2
-rwxr-xr-xscripts/rabbitmq-server.bat2
-rwxr-xr-xscripts/rabbitmq-service.bat2
-rwxr-xr-xscripts/rabbitmqctl2
-rwxr-xr-x[-rw-r--r--]scripts/rabbitmqctl.bat2
-rw-r--r--src/credit_flow.erl127
-rw-r--r--src/delegate.erl2
-rw-r--r--src/delegate_sup.erl2
-rw-r--r--src/file_handle_cache.erl2
-rw-r--r--src/gatherer.erl2
-rw-r--r--src/gen_server2.erl2
-rw-r--r--src/gm.erl2
-rw-r--r--src/gm_soak_test.erl2
-rw-r--r--src/gm_speed_test.erl2
-rw-r--r--src/gm_tests.erl2
-rw-r--r--src/lqueue.erl2
-rw-r--r--src/mirrored_supervisor.erl2
-rw-r--r--src/mirrored_supervisor_tests.erl2
-rw-r--r--src/mnesia_sync.erl77
-rw-r--r--src/pg_local.erl2
-rw-r--r--src/priority_queue.erl2
-rw-r--r--src/rabbit.erl38
-rw-r--r--src/rabbit_access_control.erl3
-rw-r--r--src/rabbit_alarm.erl2
-rw-r--r--src/rabbit_amqqueue.erl101
-rw-r--r--src/rabbit_amqqueue_process.erl111
-rw-r--r--src/rabbit_amqqueue_sup.erl2
-rw-r--r--src/rabbit_auth_backend.erl2
-rw-r--r--src/rabbit_auth_backend_internal.erl2
-rw-r--r--src/rabbit_auth_mechanism.erl2
-rw-r--r--src/rabbit_auth_mechanism_amqplain.erl2
-rw-r--r--src/rabbit_auth_mechanism_cr_demo.erl2
-rw-r--r--src/rabbit_auth_mechanism_plain.erl2
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_backing_queue_qc.erl2
-rw-r--r--src/rabbit_basic.erl4
-rw-r--r--src/rabbit_binary_generator.erl2
-rw-r--r--src/rabbit_binary_parser.erl2
-rw-r--r--src/rabbit_binding.erl2
-rw-r--r--src/rabbit_channel.erl283
-rw-r--r--src/rabbit_channel_sup.erl2
-rw-r--r--src/rabbit_channel_sup_sup.erl2
-rw-r--r--src/rabbit_client_sup.erl2
-rw-r--r--src/rabbit_command_assembler.erl2
-rw-r--r--src/rabbit_connection_sup.erl2
-rw-r--r--src/rabbit_control.erl28
-rw-r--r--src/rabbit_direct.erl2
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_error_logger_file_h.erl2
-rw-r--r--src/rabbit_event.erl2
-rw-r--r--src/rabbit_exchange.erl2
-rw-r--r--src/rabbit_exchange_type.erl2
-rw-r--r--src/rabbit_exchange_type_direct.erl2
-rw-r--r--src/rabbit_exchange_type_fanout.erl2
-rw-r--r--src/rabbit_exchange_type_headers.erl2
-rw-r--r--src/rabbit_exchange_type_topic.erl5
-rw-r--r--src/rabbit_file.erl2
-rw-r--r--src/rabbit_framing.erl2
-rw-r--r--src/rabbit_guid.erl83
-rw-r--r--src/rabbit_heartbeat.erl2
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_log.erl112
-rw-r--r--src/rabbit_memory_monitor.erl2
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl13
-rw-r--r--src/rabbit_mirror_queue_master.erl8
-rw-r--r--src/rabbit_mirror_queue_misc.erl16
-rw-r--r--src/rabbit_mirror_queue_slave.erl117
-rw-r--r--src/rabbit_mirror_queue_slave_sup.erl2
-rw-r--r--src/rabbit_misc.erl81
-rw-r--r--src/rabbit_mnesia.erl72
-rw-r--r--src/rabbit_msg_file.erl2
-rw-r--r--src/rabbit_msg_store.erl67
-rw-r--r--src/rabbit_msg_store_ets_index.erl2
-rw-r--r--src/rabbit_msg_store_gc.erl2
-rw-r--r--src/rabbit_msg_store_index.erl2
-rw-r--r--src/rabbit_net.erl22
-rw-r--r--src/rabbit_networking.erl21
-rw-r--r--src/rabbit_node_monitor.erl25
-rw-r--r--src/rabbit_nodes.erl94
-rw-r--r--src/rabbit_plugins.erl2
-rw-r--r--src/rabbit_prelaunch.erl24
-rw-r--r--src/rabbit_queue_collector.erl2
-rw-r--r--src/rabbit_queue_index.erl4
-rw-r--r--src/rabbit_reader.erl200
-rw-r--r--src/rabbit_registry.erl2
-rw-r--r--src/rabbit_restartable_sup.erl2
-rw-r--r--src/rabbit_router.erl2
-rw-r--r--src/rabbit_sasl_report_file_h.erl2
-rw-r--r--src/rabbit_ssl.erl10
-rw-r--r--src/rabbit_sup.erl2
-rw-r--r--src/rabbit_tests.erl76
-rw-r--r--src/rabbit_tests_event_receiver.erl2
-rw-r--r--src/rabbit_trace.erl2
-rw-r--r--src/rabbit_types.erl2
-rw-r--r--src/rabbit_upgrade.erl2
-rw-r--r--src/rabbit_upgrade_functions.erl2
-rw-r--r--src/rabbit_variable_queue.erl19
-rw-r--r--src/rabbit_version.erl2
-rw-r--r--src/rabbit_vhost.erl2
-rw-r--r--src/rabbit_writer.erl7
-rw-r--r--src/supervisor2.erl2
-rw-r--r--src/tcp_acceptor.erl36
-rw-r--r--src/tcp_acceptor_sup.erl2
-rw-r--r--src/tcp_listener.erl7
-rw-r--r--src/tcp_listener_sup.erl2
-rw-r--r--src/test_sup.erl2
-rw-r--r--src/vm_memory_monitor.erl2
-rw-r--r--src/worker_pool.erl2
-rw-r--r--src/worker_pool_sup.erl2
-rw-r--r--src/worker_pool_worker.erl2
134 files changed, 1316 insertions, 868 deletions
diff --git a/LICENSE-MPL-RabbitMQ b/LICENSE-MPL-RabbitMQ
index 14bcc21d..d50e32ef 100644
--- a/LICENSE-MPL-RabbitMQ
+++ b/LICENSE-MPL-RabbitMQ
@@ -447,7 +447,7 @@ EXHIBIT A -Mozilla Public License.
The Original Code is RabbitMQ.
The Initial Developer of the Original Code is VMware, Inc.
- Copyright (c) 2007-2011 VMware, Inc. All rights reserved.''
+ Copyright (c) 2007-2012 VMware, Inc. All rights reserved.''
[NOTE: The text of this Exhibit A may differ slightly from the text of
the notices in the Source Code files of the Original Code. You should
diff --git a/Makefile b/Makefile
index 31c71dd4..948c6c06 100644
--- a/Makefile
+++ b/Makefile
@@ -347,10 +347,10 @@ $(foreach XML,$(USAGES_XML),$(eval $(call usage_dep, $(XML))))
# Note that all targets which depend on clean must have clean in their
# name. Also any target that doesn't depend on clean should not have
# clean in its name, unless you know that you don't need any of the
-# automatic dependency generation for that target (eg cleandb).
+# automatic dependency generation for that target (e.g. cleandb).
# We want to load the dep file if *any* target *doesn't* contain
-# "clean" - i.e. if removing all clean-like targets leaves something
+# "clean" - i.e. if removing all clean-like targets leaves something.
ifeq "$(MAKECMDGOALS)" ""
TESTABLEGOALS:=$(.DEFAULT_GOAL)
diff --git a/codegen.py b/codegen.py
index 494be73d..9483e854 100644
--- a/codegen.py
+++ b/codegen.py
@@ -11,7 +11,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
from __future__ import nested_scopes
@@ -118,7 +118,7 @@ def printFileHeader():
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%"""
def genErl(spec):
diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl
index 88aa2e78..d83d5073 100644
--- a/docs/html-to-website-xml.xsl
+++ b/docs/html-to-website-xml.xsl
@@ -8,8 +8,6 @@
<xsl:output method="xml" />
-<xsl:template match="*"/>
-
<!-- Copy every element through -->
<xsl:template match="*">
<xsl:element name="{name()}" namespace="http://www.w3.org/1999/xhtml">
@@ -28,36 +26,30 @@
<head>
<title><xsl:value-of select="document($original)/refentry/refnamediv/refname"/><xsl:if test="document($original)/refentry/refmeta/manvolnum">(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</xsl:if> manual page</title>
</head>
- <body>
- <doc:div>
- <xsl:choose>
+ <body show-in-this-page="true">
+ <xsl:choose>
<xsl:when test="document($original)/refentry/refmeta/manvolnum">
- <p>
- This is the manual page for
- <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>.
- </p>
- <p>
- <a href="../manpages.html">See a list of all manual pages</a>.
- </p>
+ <p>
+ This is the manual page for
+ <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>.
+ </p>
+ <p>
+ <a href="../manpages.html">See a list of all manual pages</a>.
+ </p>
</xsl:when>
<xsl:otherwise>
- <p>
- This is the documentation for
- <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>.
- </p>
+ <p>
+ This is the documentation for
+ <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>.
+ </p>
</xsl:otherwise>
- </xsl:choose>
- <p>
+ </xsl:choose>
+ <p>
For more general documentation, please see the
- <a href="../admin-guide.html">administrator's guide</a>.
- </p>
-
- <doc:toc class="compact">
- <doc:heading>Table of Contents</doc:heading>
- </doc:toc>
+ <a href="../admin-guide.html">administrator's guide</a>.
+ </p>
- <xsl:apply-templates select="body/div[@class='refentry']"/>
- </doc:div>
+ <xsl:apply-templates select="body/div[@class='refentry']"/>
</body>
</html>
</xsl:template>
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 7268f090..c1c51f9f 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -266,10 +266,9 @@
</para>
<para>
When the target files do not exist they are created.
- target files do not already exist. When
- no <option>suffix</option> is specified, the empty log
- files are simply created at the original location; no
- rotation takes place.
+ When no <option>suffix</option> is specified, the empty
+ log files are simply created at the original location;
+ no rotation takes place.
</para>
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl rotate_logs .1</screen>
@@ -1067,10 +1066,26 @@
<listitem><para>The period for which the peer's SSL
certificate is valid.</para></listitem>
</varlistentry>
+
+ <varlistentry>
+ <term>last_blocked_by</term>
+ <listitem><para>The reason for which this connection
+ was last blocked. One of 'mem' - due to a memory
+ alarm, 'flow' - due to internal flow control, or
+ 'none' if the connection was never
+ blocked.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>last_blocked_age</term>
+ <listitem><para>Time, in seconds, since this
+ connection was last blocked, or
+ 'infinity'.</para></listitem>
+ </varlistentry>
+
<varlistentry>
<term>state</term>
<listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>,
- <command>opening</command>, <command>running</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
+ <command>opening</command>, <command>running</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
</varlistentry>
<varlistentry>
<term>channels</term>
@@ -1127,8 +1142,9 @@
</varlistentry>
</variablelist>
<para>
- If no <command>connectioninfoitem</command>s are specified then user, peer
- address, peer port and connection state are displayed.
+ If no <command>connectioninfoitem</command>s are
+ specified then user, peer address, peer port, time since
+ flow control and memory block state are displayed.
</para>
<para role="example-prefix">
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 9301af6b..2fee1114 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -37,6 +37,7 @@
{auth_backends, [rabbit_auth_backend_internal]},
{delegate_count, 16},
{trace_vhosts, []},
+ {log_levels, [{connection, info}]},
{tcp_listen_options, [binary,
{packet, raw},
{reuseaddr, true},
diff --git a/include/gm_specs.hrl b/include/gm_specs.hrl
index ee29706e..a317e63b 100644
--- a/include/gm_specs.hrl
+++ b/include/gm_specs.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-ifdef(use_specs).
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index d81b82db..faf3059a 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-record(user, {username,
@@ -86,7 +86,7 @@
%%----------------------------------------------------------------------------
--define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2011 VMware, Inc.").
+-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2012 VMware, Inc.").
-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
-define(PROTOCOL_VERSION, "AMQP 0-9-1 / 0-9 / 0-8").
-define(ERTS_MINIMUM, "5.6.3").
@@ -95,16 +95,7 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
+-define(CREDIT_DISC_BOUND, {2000, 500}).
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
-
--ifdef(debug).
--define(LOGDEBUG0(F), rabbit_log:debug(F)).
--define(LOGDEBUG(F,A), rabbit_log:debug(F,A)).
--define(LOGMESSAGE(D,C,M,Co), rabbit_log:message(D,C,M,Co)).
--else.
--define(LOGDEBUG0(F), ok).
--define(LOGDEBUG(F,A), ok).
--define(LOGMESSAGE(D,C,M,Co), ok).
--endif.
diff --git a/include/rabbit_auth_backend_spec.hrl b/include/rabbit_auth_backend_spec.hrl
index 803bb75c..61a2e22a 100644
--- a/include/rabbit_auth_backend_spec.hrl
+++ b/include/rabbit_auth_backend_spec.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-ifdef(use_specs).
diff --git a/include/rabbit_auth_mechanism_spec.hrl b/include/rabbit_auth_mechanism_spec.hrl
index 614a3eed..9a2f5e05 100644
--- a/include/rabbit_auth_mechanism_spec.hrl
+++ b/include/rabbit_auth_mechanism_spec.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-ifdef(use_specs).
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 4a657951..2a8cc13c 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-type(fetch_result(Ack) ::
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index f6283ef7..8f7e22d3 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-ifdef(use_specs).
diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl
index e9150a97..f7c10bd8 100644
--- a/include/rabbit_msg_store.hrl
+++ b/include/rabbit_msg_store.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-include("rabbit.hrl").
diff --git a/include/rabbit_msg_store_index.hrl b/include/rabbit_msg_store_index.hrl
index 2ae5b000..75d7eb71 100644
--- a/include/rabbit_msg_store_index.hrl
+++ b/include/rabbit_msg_store_index.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-include("rabbit_msg_store.hrl").
diff --git a/packaging/common/LICENSE.tail b/packaging/common/LICENSE.tail
index 5d842cc1..b9c2629b 100644
--- a/packaging/common/LICENSE.tail
+++ b/packaging/common/LICENSE.tail
@@ -56,7 +56,7 @@ The rest of this package is licensed under the Mozilla Public License 1.1
Authors and Copyright are as described below:
The Initial Developer of the Original Code is VMware, Inc.
- Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+ Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
MOZILLA PUBLIC LICENSE
@@ -508,7 +508,7 @@ EXHIBIT A -Mozilla Public License.
The Original Code is RabbitMQ.
The Initial Developer of the Original Code is VMware, Inc.
- Copyright (c) 2007-2011 VMware, Inc. All rights reserved.''
+ Copyright (c) 2007-2012 VMware, Inc. All rights reserved.''
[NOTE: The text of this Exhibit A may differ slightly from the text of
the notices in the Source Code files of the Original Code. You should
diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper
index 0436f546..0e59c218 100644
--- a/packaging/common/rabbitmq-script-wrapper
+++ b/packaging/common/rabbitmq-script-wrapper
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
# Escape spaces and quotes, because shell is revolting.
diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf
index e6776eff..14557286 100755
--- a/packaging/common/rabbitmq-server.ocf
+++ b/packaging/common/rabbitmq-server.ocf
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
##
diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile
index 8696427e..79e9c1dd 100644
--- a/packaging/debs/Debian/Makefile
+++ b/packaging/debs/Debian/Makefile
@@ -34,7 +34,7 @@ package: clean
chmod a+x $(UNPACKED_DIR)/debian/rules
echo "This package was debianized by Tony Garnock-Jones <tonyg@rabbitmq.com> on\nWed, 3 Jan 2007 15:43:44 +0000.\n\nIt was downloaded from http://www.rabbitmq.com/\n\n" > $(UNPACKED_DIR)/debian/copyright
cat $(UNPACKED_DIR)/LICENSE >> $(UNPACKED_DIR)/debian/copyright
- echo "\n\nThe Debian packaging is (C) 2007-2011, VMware, Inc. and is licensed\nunder the MPL 1.1, see above.\n" >> $(UNPACKED_DIR)/debian/copyright
+ echo "\n\nThe Debian packaging is (C) 2007-2012, VMware, Inc. and is licensed\nunder the MPL 1.1, see above.\n" >> $(UNPACKED_DIR)/debian/copyright
UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR)
cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING)
rm -rf $(UNPACKED_DIR)
diff --git a/packaging/debs/Debian/debian/control b/packaging/debs/Debian/debian/control
index 45f5c5c4..fb02cd6a 100644
--- a/packaging/debs/Debian/debian/control
+++ b/packaging/debs/Debian/debian/control
@@ -2,7 +2,7 @@ Source: rabbitmq-server
Section: net
Priority: extra
Maintainer: RabbitMQ Team <packaging@rabbitmq.com>
-Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc
+Build-Depends: cdbs, debhelper (>= 5), erlang-dev, python-simplejson, xmlto, xsltproc, erlang-nox (>= 1:12.b.3), erlang-src (>= 1:12.b.3), unzip, zip
Standards-Version: 3.8.0
Package: rabbitmq-server
diff --git a/packaging/windows-exe/rabbitmq_nsi.in b/packaging/windows-exe/rabbitmq_nsi.in
index 27e4e1dc..91510991 100644
--- a/packaging/windows-exe/rabbitmq_nsi.in
+++ b/packaging/windows-exe/rabbitmq_nsi.in
@@ -37,7 +37,7 @@ VIAddVersionKey /LANG=${LANG_ENGLISH} "ProductName" "RabbitMQ Server"
;VIAddVersionKey /LANG=${LANG_ENGLISH} "Comments" ""
VIAddVersionKey /LANG=${LANG_ENGLISH} "CompanyName" "VMware, Inc"
;VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalTrademarks" "" ; TODO ?
-VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2011 VMware, Inc. All rights reserved."
+VIAddVersionKey /LANG=${LANG_ENGLISH} "LegalCopyright" "Copyright (c) 2007-2012 VMware, Inc. All rights reserved."
VIAddVersionKey /LANG=${LANG_ENGLISH} "FileDescription" "RabbitMQ Server"
VIAddVersionKey /LANG=${LANG_ENGLISH} "FileVersion" "%%VERSION%%"
diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile
index 828cf000..a910941b 100644
--- a/packaging/windows/Makefile
+++ b/packaging/windows/Makefile
@@ -26,6 +26,9 @@ dist:
elinks -dump -no-references -no-numbering rabbitmq-service.html \
> $(TARGET_DIR)/readme-service.txt
todos $(TARGET_DIR)/readme-service.txt
+ todos $(TARGET_DIR)/INSTALL
+ todos $(TARGET_DIR)/LICENSE*
+ todos $(TARGET_DIR)/plugins/README.txt
rm -rf $(TARGET_DIR)/plugins-src
zip -q -r $(TARGET_ZIP).zip $(TARGET_DIR)
rm -rf $(TARGET_DIR) rabbitmq-service.html
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index 58677d14..1fd1339d 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
# Determine where this script is really located
diff --git a/scripts/rabbitmq-plugins b/scripts/rabbitmq-plugins
index 247cbfe2..14a18d57 100755
--- a/scripts/rabbitmq-plugins
+++ b/scripts/rabbitmq-plugins
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
# Get default settings with user overrides for (RABBITMQ_)<var_name>
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index ca874a7f..66a900a1 100755
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -12,7 +12,7 @@ REM
REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index a191805f..0a5a4640 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
# Get default settings with user overrides for (RABBITMQ_)<var_name>
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 44ce1ce1..ca49a5d8 100755
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -12,7 +12,7 @@ REM
REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 1582bfb1..9e274840 100755
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -12,7 +12,7 @@ REM
REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index 887eeac6..4aad6b8f 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -12,7 +12,7 @@
## The Original Code is RabbitMQ.
##
## The Initial Developer of the Original Code is VMware, Inc.
-## Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+## Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
##
# Get default settings with user overrides for (RABBITMQ_)<var_name>
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index a74a91fd..f37fae48 100644..100755
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -12,7 +12,7 @@ REM
REM The Original Code is RabbitMQ.
REM
REM The Initial Developer of the Original Code is VMware, Inc.
-REM Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+REM Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
REM
setlocal
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
new file mode 100644
index 00000000..072f4d9d
--- /dev/null
+++ b/src/credit_flow.erl
@@ -0,0 +1,127 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(credit_flow).
+
+%% Credit flow is controlled by a credit specification - a
+%% {InitialCredit, MoreCreditAfter} tuple. For the message sender,
+%% credit starts at InitialCredit and is decremented with every
+%% message sent. The message receiver grants more credit to the sender
+%% by sending it a {bump_credit, ...} control message after receiving
+%% MoreCreditAfter messages. The sender should pass this message in to
+%% handle_bump_msg/1. The sender should block when it goes below 0
+%% (check by invoking blocked/0). If a process is both a sender and a
+%% receiver it will not grant any more credit to its senders when it
+%% is itself blocked - thus the only processes that need to check
+%% blocked/0 are ones that read from network sockets.
+
+-define(DEFAULT_CREDIT, {200, 50}).
+
+-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0]).
+-export([peer_down/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-opaque(bump_msg() :: {pid(), non_neg_integer()}).
+-type(credit_spec() :: {non_neg_integer(), non_neg_integer()}).
+
+-spec(send/1 :: (pid()) -> 'ok').
+-spec(send/2 :: (pid(), credit_spec()) -> 'ok').
+-spec(ack/1 :: (pid()) -> 'ok').
+-spec(ack/2 :: (pid(), credit_spec()) -> 'ok').
+-spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok').
+-spec(blocked/0 :: () -> boolean()).
+-spec(peer_down/1 :: (pid()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+%% There are two "flows" here; of messages and of credit, going in
+%% opposite directions. The variable names "From" and "To" refer to
+%% the flow of credit, but the function names refer to the flow of
+%% messages. This is the clearest I can make it (since the function
+%% names form the API and want to make sense externally, while the
+%% variable names are used in credit bookkeeping and want to make
+%% sense internally).
+
+%% For any given pair of processes, ack/2 and send/2 must always be
+%% called with the same credit_spec().
+
+send(From) -> send(From, ?DEFAULT_CREDIT).
+
+send(From, {InitialCredit, _MoreCreditAfter}) ->
+ update({credit_from, From}, InitialCredit,
+ fun (1) -> block(From),
+ 0;
+ (C) -> C - 1
+ end).
+
+ack(To) -> ack(To, ?DEFAULT_CREDIT).
+
+ack(To, {_InitialCredit, MoreCreditAfter}) ->
+ update({credit_to, To}, MoreCreditAfter,
+ fun (1) -> grant(To, MoreCreditAfter),
+ MoreCreditAfter;
+ (C) -> C - 1
+ end).
+
+handle_bump_msg({From, MoreCredit}) ->
+ update({credit_from, From}, 0,
+ fun (C) when C =< 0 andalso C + MoreCredit > 0 -> unblock(From),
+ C + MoreCredit;
+ (C) -> C + MoreCredit
+ end).
+
+blocked() -> get(credit_blocked, []) =/= [].
+
+peer_down(Peer) ->
+ %% In theory we could also remove it from credit_deferred here, but it
+ %% doesn't really matter; at some point later we will drain
+ %% credit_deferred and thus send messages into the void...
+ unblock(Peer),
+ erase({credit_from, Peer}),
+ erase({credit_to, Peer}).
+
+%% --------------------------------------------------------------------------
+
+grant(To, Quantity) ->
+ Msg = {bump_credit, {self(), Quantity}},
+ case blocked() of
+ false -> To ! Msg;
+ true -> update(credit_deferred, [],
+ fun (Deferred) -> [{To, Msg} | Deferred] end)
+ end.
+
+block(From) -> update(credit_blocked, [], fun (Blocks) -> [From | Blocks] end).
+
+unblock(From) ->
+ update(credit_blocked, [], fun (Blocks) -> Blocks -- [From] end),
+ case blocked() of
+ false -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])],
+ erase(credit_deferred);
+ true -> ok
+ end.
+
+get(Key, Default) ->
+ case get(Key) of
+ undefined -> Default;
+ Value -> Value
+ end.
+
+update(Key, Default, Fun) -> put(Key, Fun(get(Key, Default))), ok.
diff --git a/src/delegate.erl b/src/delegate.erl
index edb4eba4..d595e481 100644
--- a/src/delegate.erl
+++ b/src/delegate.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(delegate).
diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl
index 4c131a6c..2a8b915b 100644
--- a/src/delegate_sup.erl
+++ b/src/delegate_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(delegate_sup).
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index c11fb54b..59a0ab1c 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(file_handle_cache).
diff --git a/src/gatherer.erl b/src/gatherer.erl
index fe976b50..98b36038 100644
--- a/src/gatherer.erl
+++ b/src/gatherer.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(gatherer).
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 49913d26..f8537487 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -73,7 +73,7 @@
%% but where the second argument is specifically the priority_queue
%% which contains the prioritised message_queue.
-%% All modifications are (C) 2009-2011 VMware, Inc.
+%% All modifications are (C) 2009-2012 VMware, Inc.
%% ``The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
diff --git a/src/gm.erl b/src/gm.erl
index 6c899122..6f9ff564 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(gm).
diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl
index 5e5a3a5a..57217541 100644
--- a/src/gm_soak_test.erl
+++ b/src/gm_soak_test.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(gm_soak_test).
diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl
index defb0f29..dad75bd4 100644
--- a/src/gm_speed_test.erl
+++ b/src/gm_speed_test.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(gm_speed_test).
diff --git a/src/gm_tests.erl b/src/gm_tests.erl
index ca0ffd64..0a2d4204 100644
--- a/src/gm_tests.erl
+++ b/src/gm_tests.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(gm_tests).
diff --git a/src/lqueue.erl b/src/lqueue.erl
index 04b40706..c4e046b5 100644
--- a/src/lqueue.erl
+++ b/src/lqueue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(lqueue).
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 3ba8f50d..a599effa 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(mirrored_supervisor).
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index d48a9ca5..e8baabe8 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(mirrored_supervisor_tests).
diff --git a/src/mnesia_sync.erl b/src/mnesia_sync.erl
new file mode 100644
index 00000000..a3773d90
--- /dev/null
+++ b/src/mnesia_sync.erl
@@ -0,0 +1,77 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(mnesia_sync).
+
+%% mnesia:sync_transaction/3 fails to guarantee that the log is flushed to disk
+%% at commit. This module is an attempt to minimise the risk of data loss by
+%% performing a coalesced log fsync. Unfortunately this is performed regardless
+%% of whether or not the log was appended to.
+
+-behaviour(gen_server).
+
+-export([sync/0]).
+
+-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+
+-record(state, {waiting, disc_node}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(sync/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+sync() ->
+ gen_server:call(?SERVER, sync, infinity).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, #state{disc_node = mnesia:system_info(use_dir), waiting = []}}.
+
+handle_call(sync, _From, #state{disc_node = false} = State) ->
+ {reply, ok, State};
+handle_call(sync, From, #state{waiting = Waiting} = State) ->
+ {noreply, State#state{waiting = [From | Waiting]}, 0};
+handle_call(Request, _From, State) ->
+ {stop, {unhandled_call, Request}, State}.
+
+handle_cast(Request, State) ->
+ {stop, {unhandled_cast, Request}, State}.
+
+handle_info(timeout, #state{waiting = Waiting} = State) ->
+ ok = disk_log:sync(latest_log),
+ [gen_server:reply(From, ok) || From <- Waiting],
+ {noreply, State#state{waiting = []}};
+handle_info(Message, State) ->
+ {stop, {unhandled_info, Message}, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/pg_local.erl b/src/pg_local.erl
index c9c3a3a7..e2e82f1f 100644
--- a/src/pg_local.erl
+++ b/src/pg_local.erl
@@ -13,7 +13,7 @@
%% versions of Erlang/OTP. The remaining type specs have been
%% removed.
-%% All modifications are (C) 2010-2011 VMware, Inc.
+%% All modifications are (C) 2010-2012 VMware, Inc.
%% %CopyrightBegin%
%%
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 4fc8b469..780fa2e9 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% Priority queues have essentially the same interface as ordinary
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 9609eb04..0a0ca90a 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit).
@@ -45,6 +45,12 @@
{requires, file_handle_cache},
{enables, external_infrastructure}]}).
+-rabbit_boot_step({database_sync,
+ [{description, "database sync"},
+ {mfa, {rabbit_sup, start_child, [mnesia_sync]}},
+ {requires, database},
+ {enables, external_infrastructure}]}).
+
-rabbit_boot_step({file_handle_cache,
[{description, "file handle cache server"},
{mfa, {rabbit_sup, start_restartable_child,
@@ -191,7 +197,7 @@
rabbit_queue_index, gen, dict, ordsets, file_handle_cache,
rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file,
rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
- mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists]).
+ mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow]).
%% HiPE compilation uses multiple cores anyway, but some bits are
%% IO-bound so we can go faster if we parallelise a bit more. In
@@ -360,10 +366,8 @@ rotate_logs(BinarySuffix) ->
start(normal, []) ->
case erts_version_check() of
ok ->
- ok = rabbit_mnesia:delete_previously_running_nodes(),
{ok, SupPid} = rabbit_sup:start_link(),
true = register(rabbit, self()),
-
print_banner(),
[ok = run_boot_step(Step) || Step <- boot_steps()],
io:format("~nbroker running~n"),
@@ -442,8 +446,7 @@ run_boot_step({StepName, Attributes}) ->
[try
apply(M,F,A)
catch
- _:Reason -> boot_error("FAILED~nReason: ~p~nStacktrace: ~p~n",
- [Reason, erlang:get_stacktrace()])
+ _:Reason -> boot_step_error(Reason, erlang:get_stacktrace())
end || {M,F,A} <- MFAs],
io:format("done~n"),
ok
@@ -502,8 +505,27 @@ sort_boot_steps(UnsortedSteps) ->
end])
end.
+boot_step_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
+ {Err, Nodes} =
+ case rabbit_mnesia:read_previously_running_nodes() of
+ [] -> {"Timeout contacting cluster nodes. Since RabbitMQ was"
+ " shut down forcefully~nit cannot determine which nodes"
+ " are timing out. Details on all nodes will~nfollow.~n",
+ rabbit_mnesia:all_clustered_nodes() -- [node()]};
+ Ns -> {rabbit_misc:format(
+ "Timeout contacting cluster nodes: ~p.~n", [Ns]),
+ Ns}
+ end,
+ boot_error(Err ++ rabbit_nodes:diagnostics(Nodes) ++ "~n~n", []);
+
+boot_step_error(Reason, Stacktrace) ->
+ boot_error("Error description:~n ~p~n~n"
+ "Log files (may contain more information):~n ~s~n ~s~n~n"
+ "Stack trace:~n ~p~n~n",
+ [Reason, log_location(kernel), log_location(sasl), Stacktrace]).
+
boot_error(Format, Args) ->
- io:format("BOOT ERROR: " ++ Format, Args),
+ io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args),
error_logger:error_msg(Format, Args),
timer:sleep(1000),
exit({?MODULE, failure_during_boot}).
@@ -657,7 +679,7 @@ print_banner() ->
{"app descriptor", app_location()},
{"home dir", home_dir()},
{"config file(s)", config_files()},
- {"cookie hash", rabbit_misc:cookie_hash()},
+ {"cookie hash", rabbit_nodes:cookie_hash()},
{"log", log_location(kernel)},
{"sasl log", log_location(sasl)},
{"database dir", rabbit_mnesia:dir()},
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index ca28d686..75c53511 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_access_control).
@@ -66,7 +66,6 @@ check_user_login(Username, AuthProps) ->
check_vhost_access(User = #user{ username = Username,
auth_backend = Module }, VHostPath) ->
- ?LOGDEBUG("Checking VHost access for ~p to ~p~n", [Username, VHostPath]),
check_access(
fun() ->
rabbit_vhost:exists(VHostPath) andalso
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 517dd4ec..187ec1ab 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_alarm).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 41e644f2..a7dfd535 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_amqqueue).
@@ -20,12 +20,12 @@
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
- stat/1, deliver/2, requeue/3, ack/3, reject/4]).
+ stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2, unblock/2, flush_all/2]).
+-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]).
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-export([store_queue/1]).
@@ -40,6 +40,8 @@
-define(INTEGER_ARG_TYPES, [byte, short, signedint, long]).
+-define(MORE_CONSUMER_CREDIT_AFTER, 50).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -120,6 +122,8 @@
-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
{routing_result(), qpids()}).
+-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
+ {routing_result(), qpids()}).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
@@ -135,6 +139,7 @@
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
+-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
-spec(internal_delete/1 ::
@@ -425,39 +430,9 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge).
-deliver([], #delivery{mandatory = false, immediate = false}) ->
- %% /dev/null optimisation
- {routed, []};
+deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
-deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}) ->
- %% optimisation: when Mandatory = false and Immediate = false,
- %% rabbit_amqqueue:deliver will deliver the message to the queue
- %% process asynchronously, and return true, which means all the
- %% QPids will always be returned. It is therefore safe to use a
- %% fire-and-forget cast here and return the QPids - the semantics
- %% is preserved. This scales much better than the non-immediate
- %% case below.
- QPids = qpids(Qs),
- delegate:invoke_no_result(
- QPids, fun (QPid) -> gen_server2:cast(QPid, {deliver, Delivery}) end),
- {routed, QPids};
-
-deliver(Qs, Delivery = #delivery{mandatory = Mandatory,
- immediate = Immediate}) ->
- QPids = qpids(Qs),
- {Success, _} =
- delegate:invoke(
- QPids, fun (QPid) ->
- gen_server2:call(QPid, {deliver, Delivery}, infinity)
- end),
- case {Mandatory, Immediate,
- lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]};
- ({_, false}, {_, H}) -> {true, H}
- end, {false, []}, Success)} of
- {true, _ , {false, []}} -> {unroutable, []};
- {_ , true, {_ , []}} -> {not_delivered, []};
- {_ , _ , {_ , R}} -> {routed, R}
- end.
+deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow).
requeue(QPid, MsgIds, ChPid) ->
delegate_call(QPid, {requeue, MsgIds, ChPid}).
@@ -489,7 +464,21 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
notify_sent(QPid, ChPid) ->
- gen_server2:cast(QPid, {notify_sent, ChPid}).
+ Key = {consumer_credit_to, QPid},
+ put(Key, case get(Key) of
+ 1 -> gen_server2:cast(
+ QPid, {notify_sent, ChPid,
+ ?MORE_CONSUMER_CREDIT_AFTER}),
+ ?MORE_CONSUMER_CREDIT_AFTER;
+ undefined -> erlang:monitor(process, QPid),
+ ?MORE_CONSUMER_CREDIT_AFTER - 1;
+ C -> C - 1
+ end),
+ ok.
+
+notify_sent_queue_down(QPid) ->
+ erase({consumer_credit_to, QPid}),
+ ok.
unblock(QPid, ChPid) ->
delegate_cast(QPid, {unblock, ChPid}).
@@ -549,6 +538,46 @@ pseudo_queue(QueueName, Pid) ->
slave_pids = [],
mirror_nodes = undefined}.
+deliver([], #delivery{mandatory = false, immediate = false}, _Flow) ->
+ %% /dev/null optimisation
+ {routed, []};
+
+deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) ->
+ %% optimisation: when Mandatory = false and Immediate = false,
+ %% rabbit_amqqueue:deliver will deliver the message to the queue
+ %% process asynchronously, and return true, which means all the
+ %% QPids will always be returned. It is therefore safe to use a
+ %% fire-and-forget cast here and return the QPids - the semantics
+ %% is preserved. This scales much better than the non-immediate
+ %% case below.
+ QPids = qpids(Qs),
+ case Flow of
+ flow -> [credit_flow:send(QPid) || QPid <- QPids];
+ noflow -> ok
+ end,
+ delegate:invoke_no_result(
+ QPids, fun (QPid) ->
+ gen_server2:cast(QPid, {deliver, Delivery, Flow})
+ end),
+ {routed, QPids};
+
+deliver(Qs, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate},
+ _Flow) ->
+ QPids = qpids(Qs),
+ {Success, _} =
+ delegate:invoke(
+ QPids, fun (QPid) ->
+ gen_server2:call(QPid, {deliver, Delivery}, infinity)
+ end),
+ case {Mandatory, Immediate,
+ lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]};
+ ({_, false}, {_, H}) -> {true, H}
+ end, {false, []}, Success)} of
+ {true, _ , {false, []}} -> {unroutable, []};
+ {_ , true, {_ , []}} -> {not_delivered, []};
+ {_ , _ , {_ , R}} -> {routed, R}
+ end.
+
qpids(Qs) -> lists:append([[QPid | SPids] ||
#amqqueue{pid = QPid, slave_pids = SPids} <- Qs]).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 161f9787..12cd0c93 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_amqqueue_process).
@@ -20,7 +20,7 @@
-behaviour(gen_server2).
--define(UNSENT_MESSAGE_LIMIT, 100).
+-define(UNSENT_MESSAGE_LIMIT, 200).
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
@@ -115,7 +115,6 @@ info_keys() -> ?INFO_KEYS.
%%----------------------------------------------------------------------------
init(Q) ->
- ?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
State = #q{q = Q#amqqueue{pid = self()},
@@ -135,7 +134,6 @@ init(Q) ->
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
RateTRef, AckTags, Deliveries, MTC) ->
- ?LOGDEBUG("Queue starting - ~p~n", [Q]),
case Owner of
none -> ok;
_ -> erlang:monitor(process, Owner)
@@ -390,34 +388,32 @@ ch_record_state_transition(OldCR, NewCR) ->
{_, _} -> ok
end.
-deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
+deliver_msgs_to_consumers(_DeliverFun, true, State) ->
+ {true, State};
+deliver_msgs_to_consumers(DeliverFun, false,
State = #q{active_consumers = ActiveConsumers}) ->
- case PredFun(FunAcc, State) of
- false -> {FunAcc, State};
- true -> case queue:out(ActiveConsumers) of
- {empty, _} ->
- {FunAcc, State};
- {{value, QEntry}, Tail} ->
- {FunAcc1, State1} =
- deliver_msg_to_consumer(
+ case queue:out(ActiveConsumers) of
+ {empty, _} ->
+ {false, State};
+ {{value, QEntry}, Tail} ->
+ {Stop, State1} = deliver_msg_to_consumer(
DeliverFun, QEntry,
- FunAcc, State#q{active_consumers = Tail}),
- deliver_msgs_to_consumers(Funs, FunAcc1, State1)
- end
+ State#q{active_consumers = Tail}),
+ deliver_msgs_to_consumers(DeliverFun, Stop, State1)
end.
-deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, FunAcc, State) ->
+deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
C = ch_record(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
- {FunAcc, State};
+ {false, State};
false -> case rabbit_limiter:can_send(C#cr.limiter, self(),
Consumer#consumer.ack_required) of
false -> block_consumer(C#cr{is_limit_active = true}, E),
- {FunAcc, State};
+ {false, State};
true -> AC1 = queue:in(E, State#q.active_consumers),
deliver_msg_to_consumer(
- DeliverFun, Consumer, C, FunAcc,
+ DeliverFun, Consumer, C,
State#q{active_consumers = AC1})
end
end.
@@ -428,9 +424,9 @@ deliver_msg_to_consumer(DeliverFun,
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
unsent_message_count = Count},
- FunAcc, State = #q{q = #amqqueue{name = QName}}) ->
- {{Message, IsDelivered, AckTag}, FunAcc1, State1} =
- DeliverFun(AckRequired, FunAcc, State),
+ State = #q{q = #amqqueue{name = QName}}) ->
+ {{Message, IsDelivered, AckTag}, Stop, State1} =
+ DeliverFun(AckRequired, State),
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
@@ -439,11 +435,9 @@ deliver_msg_to_consumer(DeliverFun,
end,
update_ch_record(C#cr{acktags = ChAckTags1,
unsent_message_count = Count + 1}),
- {FunAcc1, State1}.
-
-deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty.
+ {Stop, State1}.
-deliver_from_queue_deliver(AckRequired, false, State) ->
+deliver_from_queue_deliver(AckRequired, State) ->
{{Message, IsDelivered, AckTag, Remaining}, State1} =
fetch(AckRequired, State),
{{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
@@ -487,12 +481,11 @@ maybe_record_confirm_message(_Confirm, State) ->
State.
run_message_queue(State) ->
- Funs = {fun deliver_from_queue_pred/2,
- fun deliver_from_queue_deliver/3},
State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
drop_expired_messages(State),
- IsEmpty = BQ:is_empty(BQS),
- {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1),
+ {_IsEmpty1, State2} = deliver_msgs_to_consumers(
+ fun deliver_from_queue_deliver/2,
+ BQ:is_empty(BQS), State1),
State2.
attempt_delivery(Delivery = #delivery{sender = ChPid,
@@ -506,10 +499,8 @@ attempt_delivery(Delivery = #delivery{sender = ChPid,
end,
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
- PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
- fun (AckRequired, false,
- State1 = #q{backing_queue_state = BQS2}) ->
+ fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
%% we don't need an expiry here because
%% messages are not being enqueued, so we use
%% an empty message_properties.
@@ -523,7 +514,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid,
State1#q{backing_queue_state = BQS3}}
end,
{Delivered, State2} =
- deliver_msgs_to_consumers({ PredFun, DeliverFun }, false,
+ deliver_msgs_to_consumers(DeliverFun, false,
State#q{backing_queue_state = BQS1}),
{Delivered, Confirm, State2};
{Duplicate, BQS1} ->
@@ -598,6 +589,12 @@ should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
+ case get({ch_publisher, DownPid}) of
+ undefined -> ok;
+ MRef -> erlang:demonitor(MRef),
+ erase({ch_publisher, DownPid}),
+ credit_flow:peer_down(DownPid)
+ end,
case lookup_ch(DownPid) of
not_found ->
{ok, State};
@@ -713,15 +710,12 @@ infos(Items, State) ->
|| Item <- (Items1 -- [synchronised_slave_pids])].
slaves_status(#q{q = #amqqueue{name = Name}}) ->
- {ok, #amqqueue{mirror_nodes = MNodes, slave_pids = SPids}} =
- rabbit_amqqueue:lookup(Name),
- case MNodes of
- undefined ->
+ case rabbit_amqqueue:lookup(Name) of
+ {ok, #amqqueue{mirror_nodes = undefined}} ->
[{slave_pids, ''}, {synchronised_slave_pids, ''}];
- _ ->
+ {ok, #amqqueue{slave_pids = SPids}} ->
{Results, _Bad} =
- delegate:invoke(
- SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end),
+ delegate:invoke(SPids, fun rabbit_mirror_queue_slave:info/1),
{SPids1, SSPids} =
lists:foldl(
fun ({Pid, Infos}, {SPidsN, SSPidsN}) ->
@@ -765,11 +759,9 @@ i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
- {ok, #amqqueue{mirror_nodes = MNodes,
- slave_pids = SPids}} = rabbit_amqqueue:lookup(Name),
- case MNodes of
- undefined -> [];
- _ -> SPids
+ case rabbit_amqqueue:lookup(Name) of
+ {ok, #amqqueue{mirror_nodes = undefined}} -> [];
+ {ok, #amqqueue{slave_pids = SPids}} -> SPids
end;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
@@ -826,7 +818,7 @@ prioritise_cast(Msg, _State) ->
{set_maximum_since_use, _Age} -> 8;
{ack, _AckTags, _ChPid} -> 7;
{reject, _AckTags, _Requeue, _ChPid} -> 7;
- {notify_sent, _ChPid} -> 7;
+ {notify_sent, _ChPid, _Credit} -> 7;
{unblock, _ChPid} -> 7;
{run_backing_queue, _Mod, _Fun} -> 6;
_ -> 0
@@ -1018,8 +1010,17 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({deliver, Delivery}, State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ case Flow of
+ flow -> Key = {ch_publisher, Sender},
+ case get(Key) of
+ undefined -> put(Key, erlang:monitor(process, Sender));
+ _ -> ok
+ end,
+ credit_flow:ack(Sender);
+ noflow -> ok
+ end,
noreply(deliver_or_enqueue(Delivery, State));
handle_cast({ack, AckTags, ChPid}, State) ->
@@ -1051,11 +1052,11 @@ handle_cast({unblock, ChPid}, State) ->
possibly_unblock(State, ChPid,
fun (C) -> C#cr{is_limit_active = false} end));
-handle_cast({notify_sent, ChPid}, State) ->
+handle_cast({notify_sent, ChPid, Credit}, State) ->
noreply(
possibly_unblock(State, ChPid,
fun (C = #cr{unsent_message_count = Count}) ->
- C#cr{unsent_message_count = Count - 1}
+ C#cr{unsent_message_count = Count - Credit}
end));
handle_cast({limit, ChPid, Limiter}, State) ->
@@ -1099,8 +1100,7 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
handle_info(maybe_expire, State) ->
case is_unused(State) of
- true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]),
- {stop, normal, State};
+ true -> {stop, normal, State};
false -> noreply(ensure_expiry_timer(State))
end;
@@ -1147,8 +1147,11 @@ handle_info(timeout, State) ->
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
+handle_info({bump_credit, Msg}, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ noreply(State);
+
handle_info(Info, State) ->
- ?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
diff --git a/src/rabbit_amqqueue_sup.erl b/src/rabbit_amqqueue_sup.erl
index 7b3ebcf2..a4305e5f 100644
--- a/src/rabbit_amqqueue_sup.erl
+++ b/src/rabbit_amqqueue_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_amqqueue_sup).
diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl
index ade158bb..e0e252b8 100644
--- a/src/rabbit_auth_backend.erl
+++ b/src/rabbit_auth_backend.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_backend).
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 086a90b4..3ef81d32 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_backend_internal).
diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl
index 897199ee..0c8251b8 100644
--- a/src/rabbit_auth_mechanism.erl
+++ b/src/rabbit_auth_mechanism.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism).
diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl
index b8682a46..3de6e7a6 100644
--- a/src/rabbit_auth_mechanism_amqplain.erl
+++ b/src/rabbit_auth_mechanism_amqplain.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism_amqplain).
diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl
index acbb6e48..64b01d8e 100644
--- a/src/rabbit_auth_mechanism_cr_demo.erl
+++ b/src/rabbit_auth_mechanism_cr_demo.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism_cr_demo).
diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl
index 2448acb6..19fb5875 100644
--- a/src/rabbit_auth_mechanism_plain.erl
+++ b/src/rabbit_auth_mechanism_plain.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism_plain).
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index c3b322ee..364eb8f6 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_backing_queue).
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index c61184a6..7b00fa5f 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_backing_queue_qc).
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index b116821c..b8211d43 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_basic).
@@ -139,7 +139,7 @@ message(XName, RoutingKey, #content{properties = Props} = DecodedContent) ->
{ok, #basic_message{
exchange_name = XName,
content = strip_header(DecodedContent, ?DELETED_HEADER),
- id = rabbit_guid:guid(),
+ id = rabbit_guid:gen(),
is_persistent = is_message_persistent(DecodedContent),
routing_keys = [RoutingKey |
header_routes(Props#'P_basic'.headers)]}}
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 494f3203..d69376fb 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_binary_generator).
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index f3ca4e98..5f0016b6 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_binary_parser).
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 655bbb73..bb44797e 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_binding).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f14b2973..a101886f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_channel).
@@ -20,7 +20,7 @@
-behaviour(gen_server2).
--export([start_link/10, do/2, do/3, flush/1, shutdown/1]).
+-export([start_link/10, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
@@ -33,9 +33,9 @@
-export([list_local/0]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- limiter, tx_status, next_tag,
- unacked_message_q, uncommitted_message_q, uncommitted_acks,
- user, virtual_host, most_recently_declared_queue, queue_monitors,
+ limiter, tx_status, next_tag, unacked_message_q,
+ uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
+ virtual_host, most_recently_declared_queue, queue_monitors,
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
unconfirmed_qm, confirmed, capabilities, trace_state}).
@@ -78,6 +78,8 @@
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
+-spec(do_flow/3 :: (pid(), rabbit_framing:amqp_method_record(),
+ rabbit_types:maybe(rabbit_types:content())) -> 'ok').
-spec(flush/1 :: (pid()) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
@@ -111,7 +113,11 @@ do(Pid, Method) ->
do(Pid, Method, none).
do(Pid, Method, Content) ->
- gen_server2:cast(Pid, {method, Method, Content}).
+ gen_server2:cast(Pid, {method, Method, Content, noflow}).
+
+do_flow(Pid, Method, Content) ->
+ credit_flow:send(Pid),
+ gen_server2:cast(Pid, {method, Method, Content, flow}).
flush(Pid) ->
gen_server2:call(Pid, flush, infinity).
@@ -185,10 +191,11 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
unacked_message_q = queue:new(),
uncommitted_message_q = queue:new(),
uncommitted_acks = [],
+ uncommitted_nacks = [],
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- queue_monitors = dict:new(),
+ queue_monitors = sets:new(),
consumer_mapping = dict:new(),
blocking = sets:new(),
queue_consumers = dict:new(),
@@ -244,7 +251,12 @@ handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
handle_call(_Request, _From, State) ->
noreply(State).
-handle_cast({method, Method, Content}, State) ->
+handle_cast({method, Method, Content, Flow},
+ State = #ch{reader_pid = Reader}) ->
+ case Flow of
+ flow -> credit_flow:ack(Reader);
+ noflow -> ok
+ end,
try handle_method(Method, Content, State) of
{reply, Reply, NewState} ->
ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
@@ -284,29 +296,19 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
handle_cast({deliver, ConsumerTag, AckRequired,
Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_keys = [RoutingKey | _CcRoutes],
- content = Content}}},
- State = #ch{writer_pid = WriterPid,
- next_tag = DeliveryTag,
- trace_state = TraceState}) ->
- State1 = lock_message(AckRequired,
- ack_record(DeliveryTag, ConsumerTag, Msg),
- State),
-
- M = #'basic.deliver'{consumer_tag = ConsumerTag,
- delivery_tag = DeliveryTag,
- redelivered = Redelivered,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey},
- rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
- State2 = maybe_incr_stats([{QPid, 1}], case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end, State1),
- State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
- rabbit_trace:tap_trace_out(Msg, TraceState),
- noreply(State3#ch{next_tag = DeliveryTag + 1});
-
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content}}},
+ State = #ch{writer_pid = WriterPid,
+ next_tag = DeliveryTag}) ->
+ ok = rabbit_writer:send_command_and_notify(
+ WriterPid, QPid, self(),
+ #'basic.deliver'{consumer_tag = ConsumerTag,
+ delivery_tag = DeliveryTag,
+ redelivered = Redelivered,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey},
+ Content),
+ noreply(record_sent(ConsumerTag, AckRequired, Msg, State));
handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
@@ -315,6 +317,10 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
+handle_info({bump_credit, Msg}, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ noreply(State);
+
handle_info(timeout, State) ->
noreply(State);
@@ -327,9 +333,10 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
State2 = queue_blocked(QPid, State1),
State3 = handle_consuming_queue_down(QPid, State2),
+ credit_flow:peer_down(QPid),
erase_queue_stats(QPid),
noreply(State3#ch{queue_monitors =
- dict:erase(QPid, State3#ch.queue_monitors)});
+ sets:del_element(QPid, State3#ch.queue_monitors)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -527,7 +534,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
#'channel.flow_ok'{active = false});
_ -> ok
end,
- demonitor_queue(QPid, State#ch{blocking = Blocking1})
+ State#ch{blocking = Blocking1}
end.
record_confirm(undefined, _, State) ->
@@ -565,8 +572,7 @@ remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs},
MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
case gb_sets:is_empty(MsgSeqNos1) of
true -> UQM1 = gb_trees:delete(QPid, UQM),
- demonitor_queue(
- QPid, State#ch{unconfirmed_qm = UQM1});
+ State#ch{unconfirmed_qm = UQM1};
false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
State#ch{unconfirmed_qm = UQM1}
end;
@@ -672,45 +678,36 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
State1 = State#ch{unacked_message_q = Remaining},
{noreply,
case TxStatus of
- none -> ack(Acked, State1);
+ none -> ack(Acked, State1),
+ State1;
in_progress -> State1#ch{uncommitted_acks =
Acked ++ State1#ch.uncommitted_acks}
end};
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
- _, State = #ch{writer_pid = WriterPid,
- conn_pid = ConnPid,
- next_tag = DeliveryTag,
- trace_state = TraceState}) ->
+ _, State = #ch{writer_pid = WriterPid,
+ conn_pid = ConnPid,
+ next_tag = DeliveryTag}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, QPid, _MsgId, Redelivered,
+ Msg = {_QName, _QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_keys = [RoutingKey | _CcRoutes],
- content = Content}}} ->
- State1 = lock_message(not(NoAck),
- ack_record(DeliveryTag, none, Msg),
- State),
- State2 = maybe_incr_stats([{QPid, 1}], case NoAck of
- true -> get_no_ack;
- false -> get
- end, State1),
- State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
- rabbit_trace:tap_trace_out(Msg, TraceState),
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content}}} ->
ok = rabbit_writer:send_command(
WriterPid,
- #'basic.get_ok'{delivery_tag = DeliveryTag,
- redelivered = Redelivered,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey,
+ #'basic.get_ok'{delivery_tag = DeliveryTag,
+ redelivered = Redelivered,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey,
message_count = MessageCount},
Content),
- {noreply, State3#ch{next_tag = DeliveryTag + 1}};
+ {noreply, record_sent(none, not(NoAck), Msg, State)};
empty ->
{reply, #'basic.get_empty'{}, State}
end;
@@ -730,7 +727,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
check_read_permitted(QueueName, State),
ActualConsumerTag =
case ConsumerTag of
- <<>> -> rabbit_guid:binstring_guid("amq.ctag");
+ <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
+ "amq.ctag");
Other -> Other
end,
@@ -787,9 +785,8 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
false -> dict:store(QPid, CTags1, QCons)
end
end,
- NewState = demonitor_queue(
- Q, State#ch{consumer_mapping = ConsumerMapping1,
- queue_consumers = QCons1}),
+ NewState = State#ch{consumer_mapping = ConsumerMapping1,
+ queue_consumers = QCons1},
%% In order to ensure that no more messages are sent to
%% the consumer after the cancel_ok has been sent, we get
%% the queue process to send the cancel_ok on our
@@ -960,7 +957,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
false -> none
end,
ActualNameBin = case QueueNameBin of
- <<>> -> rabbit_guid:binstring_guid("amq.gen");
+ <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
+ "amq.gen");
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
@@ -1068,19 +1066,26 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
-handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
- uncommitted_acks = TAL}) ->
- State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2,
- State, TMQ))),
- {noreply, maybe_complete_tx(State1#ch{tx_status = committing})};
+handle_method(#'tx.commit'{}, _,
+ State = #ch{uncommitted_message_q = TMQ,
+ uncommitted_acks = TAL,
+ uncommitted_nacks = TNL,
+ limiter = Limiter}) ->
+ State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
+ ack(TAL, State1),
+ lists:foreach(
+ fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, TNL),
+ {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))};
handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- uncommitted_acks = TAL}) ->
- UAMQ1 = queue:from_list(lists:usort(TAL ++ queue:to_list(UAMQ))),
+ uncommitted_acks = TAL,
+ uncommitted_nacks = TNL}) ->
+ TNL1 = lists:append([L || {_, L} <- TNL]),
+ UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))),
{reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})};
handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
@@ -1111,10 +1116,7 @@ handle_method(#'channel.flow'{active = false}, _,
ok = rabbit_limiter:block(Limiter1),
case consumer_queues(Consumers) of
[] -> {reply, #'channel.flow_ok'{active = false}, State1};
- QPids -> State2 = lists:foldl(fun monitor_queue/2,
- State1#ch{blocking =
- sets:from_list(QPids)},
- QPids),
+ QPids -> State2 = State1#ch{blocking = sets:from_list(QPids)},
ok = rabbit_amqqueue:flush_all(QPids, self()),
{noreply, State2}
end;
@@ -1145,31 +1147,12 @@ consumer_monitor(ConsumerTag,
end.
monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case (not dict:is_key(QPid, QMons) andalso
- queue_monitor_needed(QPid, State)) of
- true -> MRef = erlang:monitor(process, QPid),
- State#ch{queue_monitors = dict:store(QPid, MRef, QMons)};
- false -> State
- end.
-
-demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case (dict:is_key(QPid, QMons) andalso
- not queue_monitor_needed(QPid, State)) of
- true -> true = erlang:demonitor(dict:fetch(QPid, QMons)),
- State#ch{queue_monitors = dict:erase(QPid, QMons)};
+ case not sets:is_element(QPid, QMons) of
+ true -> erlang:monitor(process, QPid),
+ State#ch{queue_monitors = sets:add_element(QPid, QMons)};
false -> State
end.
-queue_monitor_needed(QPid, #ch{queue_consumers = QCons,
- blocking = Blocking,
- unconfirmed_qm = UQM} = State) ->
- StatsEnabled = rabbit_event:stats_level(
- State, #ch.stats_timer) =:= fine,
- ConsumerMonitored = dict:is_key(QPid, QCons),
- QueueBlocked = sets:is_element(QPid, Blocking),
- ConfirmMonitored = gb_trees:is_defined(QPid, UQM),
- StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored.
-
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
{value, MsgSet} -> gb_sets:to_list(MsgSet);
@@ -1266,18 +1249,46 @@ basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey},
Content).
-reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) ->
+reject(DeliveryTag, Requeue, Multiple,
+ State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
+ State1 = State#ch{unacked_message_q = Remaining},
+ {noreply,
+ case TxStatus of
+ none ->
+ reject(Requeue, Acked, State1#ch.limiter),
+ State1;
+ in_progress ->
+ State1#ch{uncommitted_nacks =
+ [{Requeue, Acked} | State1#ch.uncommitted_nacks]}
+ end}.
+
+reject(Requeue, Acked, Limiter) ->
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
end, ok, Acked),
- ok = notify_limiter(State#ch.limiter, Acked),
- {noreply, State#ch{unacked_message_q = Remaining}}.
-
-ack_record(DeliveryTag, ConsumerTag,
- _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) ->
- {DeliveryTag, ConsumerTag, {QPid, MsgId}}.
+ ok = notify_limiter(Limiter, Acked).
+
+record_sent(ConsumerTag, AckRequired,
+ Msg = {_QName, QPid, MsgId, Redelivered, _Message},
+ State = #ch{unacked_message_q = UAMQ,
+ next_tag = DeliveryTag,
+ trace_state = TraceState}) ->
+ maybe_incr_stats([{QPid, 1}], case {ConsumerTag, AckRequired} of
+ {none, true} -> get;
+ {none, false} -> get_no_ack;
+ {_ , true} -> deliver;
+ {_ , false} -> deliver_no_ack
+ end, State),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State),
+ rabbit_trace:tap_trace_out(Msg, TraceState),
+ UAMQ1 = case AckRequired of
+ true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
+ UAMQ);
+ false -> UAMQ
+ end,
+ State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
collect_acks(Q, 0, true) ->
{queue:to_list(Q), queue:new()};
@@ -1312,7 +1323,8 @@ ack(Acked, State) ->
maybe_incr_stats(QIncs, ack, State).
new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
- uncommitted_acks = []}.
+ uncommitted_acks = [],
+ uncommitted_nacks = []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1363,22 +1375,24 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
msg_seq_no = MsgSeqNo},
QNames}, State) ->
{RoutingRes, DeliveredQPids} =
- rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(QNames), Delivery),
- State1 = process_routing_result(RoutingRes, DeliveredQPids,
- XName, MsgSeqNo, Message, State),
+ rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery),
+ State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids),
+ State2 = process_routing_result(RoutingRes, DeliveredQPids,
+ XName, MsgSeqNo, Message, State1),
maybe_incr_stats([{XName, 1} |
[{{QPid, XName}, 1} ||
- QPid <- DeliveredQPids]], publish, State1).
+ QPid <- DeliveredQPids]], publish, State2),
+ State2.
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
- record_confirm(MsgSeqNo, XName,
- maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
- return_unroutable, State));
+ maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
+ return_unroutable, State),
+ record_confirm(MsgSeqNo, XName, State);
process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_consumers),
- record_confirm(MsgSeqNo, XName,
- maybe_incr_stats([{XName, 1}], return_not_delivered, State));
+ maybe_incr_stats([{XName, 1}], return_not_delivered, State),
+ record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
@@ -1396,15 +1410,10 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
State0#ch{unconfirmed_qm = UQM1};
none ->
UQM1 = gb_trees:insert(QPid, SingletonSet, UQM),
- monitor_queue(QPid, State0#ch{unconfirmed_qm = UQM1})
+ State0#ch{unconfirmed_qm = UQM1}
end
end, State#ch{unconfirmed_mq = UMQ1}, QPids).
-lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
- State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
-lock_message(false, _MsgStruct, State) ->
- State.
-
send_nacks([], State) ->
State;
send_nacks(MXs, State = #ch{tx_status = none}) ->
@@ -1420,13 +1429,12 @@ send_nacks(_, State) ->
send_confirms(State = #ch{tx_status = none, confirmed = []}) ->
State;
send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
- {MsgSeqNos, State1} =
- lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) ->
- {[MsgSeqNo | MSNs],
- maybe_incr_stats([{ExchangeName, 1}], confirm,
- State0)}
- end, {[], State}, lists:append(C)),
- send_confirms(MsgSeqNos, State1 #ch{confirmed = []});
+ MsgSeqNos =
+ lists:foldl(fun ({MsgSeqNo, XName}, MSNs) ->
+ maybe_incr_stats([{XName, 1}], confirm, State),
+ [MsgSeqNo | MSNs]
+ end, [], lists:append(C)),
+ send_confirms(MsgSeqNos, State#ch{confirmed = []});
send_confirms(State) ->
maybe_complete_tx(State).
@@ -1506,26 +1514,21 @@ i(Item, _) ->
maybe_incr_redeliver_stats(true, QPid, State) ->
maybe_incr_stats([{QPid, 1}], redeliver, State);
-maybe_incr_redeliver_stats(_, _, State) ->
- State.
+maybe_incr_redeliver_stats(_, _, _State) ->
+ ok.
maybe_incr_stats(QXIncs, Measure, State) ->
case rabbit_event:stats_level(State, #ch.stats_timer) of
- fine -> lists:foldl(fun ({QX, Inc}, State0) ->
- incr_stats(QX, Inc, Measure, State0)
- end, State, QXIncs);
- _ -> State
+ fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs];
+ _ -> ok
end.
-incr_stats({QPid, _} = QX, Inc, Measure, State) ->
- update_measures(queue_exchange_stats, QX, Inc, Measure),
- monitor_queue(QPid, State);
-incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) ->
- update_measures(queue_stats, QPid, Inc, Measure),
- monitor_queue(QPid, State);
-incr_stats(X, Inc, Measure, State) ->
- update_measures(exchange_stats, X, Inc, Measure),
- State.
+incr_stats({_, _} = QX, Inc, Measure) ->
+ update_measures(queue_exchange_stats, QX, Inc, Measure);
+incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
+ update_measures(queue_stats, QPid, Inc, Measure);
+incr_stats(X, Inc, Measure) ->
+ update_measures(exchange_stats, X, Inc, Measure).
update_measures(Type, QX, Inc, Measure) ->
Measures = case get({Type, QX}) of
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index a19b6bfd..dc262b49 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_channel_sup).
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
index e2561c80..995c41fb 100644
--- a/src/rabbit_channel_sup_sup.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_channel_sup_sup).
diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl
index 4ba01b4f..c508f1b9 100644
--- a/src/rabbit_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_client_sup).
diff --git a/src/rabbit_command_assembler.erl b/src/rabbit_command_assembler.erl
index a0953eab..adf6e417 100644
--- a/src/rabbit_command_assembler.erl
+++ b/src/rabbit_command_assembler.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_command_assembler).
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index b2aba2ee..12a532b6 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_connection_sup).
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 22b57b1a..6a775adf 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -11,13 +11,13 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_control).
-include("rabbit.hrl").
--export([start/0, stop/0, action/5, diagnostics/1]).
+-export([start/0, stop/0, action/5]).
-define(RPC_TIMEOUT, infinity).
-define(EXTERNAL_CHECK_INTERVAL, 1000).
@@ -49,7 +49,6 @@
(atom(), node(), [string()], [{string(), any()}],
fun ((string(), [any()]) -> 'ok'))
-> 'ok').
--spec(diagnostics/1 :: (node()) -> [{string(), [any()]}]).
-spec(usage/0 :: () -> no_return()).
-endif.
@@ -67,7 +66,7 @@ start() ->
CmdArgsAndOpts -> CmdArgsAndOpts
end,
Opts1 = [case K of
- ?NODE_OPT -> {?NODE_OPT, rabbit_misc:makenode(V)};
+ ?NODE_OPT -> {?NODE_OPT, rabbit_nodes:make(V)};
_ -> {K, V}
end || {K, V} <- Opts],
Command = list_to_atom(Command0),
@@ -143,26 +142,7 @@ print_report0(Node, {Module, InfoFun, KeysFun}, VHostArg) ->
print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args).
print_badrpc_diagnostics(Node) ->
- [fmt_stderr(Fmt, Args) || {Fmt, Args} <- diagnostics(Node)].
-
-diagnostics(Node) ->
- {_NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
- [{"diagnostics:", []},
- case net_adm:names(NodeHost) of
- {error, EpmdReason} ->
- {"- unable to connect to epmd on ~s: ~w",
- [NodeHost, EpmdReason]};
- {ok, NamePorts} ->
- {"- nodes and their ports on ~s: ~p",
- [NodeHost, [{list_to_atom(Name), Port} ||
- {Name, Port} <- NamePorts]]}
- end,
- {"- current node: ~w", [node()]},
- case init:get_argument(home) of
- {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]};
- Other -> {"- no current node home dir: ~p", [Other]}
- end,
- {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]}].
+ fmt_stderr(rabbit_nodes:diagnostics([Node]), []).
stop() ->
ok.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 6f9a4650..e2928cae 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_direct).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 6e29ace7..f1672f4e 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_error_logger).
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 7b6e07c1..042ab23c 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_error_logger_file_h).
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 5ae40c78..4ec141cf 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_event).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 68c0d988..83e28c44 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange).
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index ab3d00dc..44a08e24 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type).
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index b485e31f..4bce42d4 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_direct).
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 3c029722..cc3fb87c 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_fanout).
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index f09e4aae..de9979b4 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_headers).
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 91c7b5d3..84f4f8a9 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type_topic).
@@ -250,7 +250,7 @@ remove_all(Table, Pattern) ->
mnesia:match_object(Table, Pattern, write)).
new_node_id() ->
- rabbit_guid:guid().
+ rabbit_guid:gen().
split_topic_key(Key) ->
split_topic_key(Key, [], []).
@@ -263,4 +263,3 @@ split_topic_key(<<$., Rest/binary>>, RevWordAcc, RevResAcc) ->
split_topic_key(Rest, [], [lists:reverse(RevWordAcc) | RevResAcc]);
split_topic_key(<<C:8, Rest/binary>>, RevWordAcc, RevResAcc) ->
split_topic_key(Rest, [C | RevWordAcc], RevResAcc).
-
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index 5cb8e7b6..59df14f3 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_file).
diff --git a/src/rabbit_framing.erl b/src/rabbit_framing.erl
index da1a6a49..a79188ab 100644
--- a/src/rabbit_framing.erl
+++ b/src/rabbit_framing.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% TODO auto-generate
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 2d0f5014..f4c425ca 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_guid).
@@ -19,7 +19,7 @@
-behaviour(gen_server).
-export([start_link/0]).
--export([guid/0, string_guid/1, binstring_guid/1]).
+-export([gen/0, gen_secure/0, string/2, binary/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
@@ -38,9 +38,10 @@
-type(guid() :: binary()).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(guid/0 :: () -> guid()).
--spec(string_guid/1 :: (any()) -> string()).
--spec(binstring_guid/1 :: (any()) -> binary()).
+-spec(gen/0 :: () -> guid()).
+-spec(gen_secure/0 :: () -> guid()).
+-spec(string/2 :: (guid(), any()) -> string()).
+-spec(binary/2 :: (guid(), any()) -> binary()).
-endif.
@@ -65,11 +66,8 @@ update_disk_serial() ->
end,
Serial.
-%% generate a GUID.
-%%
-%% The id is only unique within a single cluster and as long as the
-%% serial store hasn't been deleted.
-guid() ->
+%% Generate an un-hashed guid.
+fresh() ->
%% We don't use erlang:now() here because a) it may return
%% duplicates when the system clock has been rewound prior to a
%% restart, or ids were generated at a high rate (which causes
@@ -78,29 +76,74 @@ guid() ->
%%
%% A persisted serial number, the node, and a unique reference
%% (per node incarnation) uniquely identifies a process in space
- %% and time. We combine that with a process-local counter to give
- %% us a GUID.
- G = case get(guid) of
- undefined -> Serial = gen_server:call(?SERVER, serial, infinity),
- {{Serial, node(), make_ref()}, 0};
+ %% and time.
+ Serial = gen_server:call(?SERVER, serial, infinity),
+ {Serial, node(), make_ref()}.
+
+advance_blocks({B1, B2, B3, B4}, I) ->
+ %% To produce a new set of blocks, we create a new 32bit block
+ %% hashing {B5, I}. The new hash is used as last block, and the
+ %% other three blocks are XORed with it.
+ %%
+ %% Doing this is convenient because it avoids cascading conflits,
+ %% while being very fast. The conflicts are avoided by propagating
+ %% the changes through all the blocks at each round by XORing, so
+ %% the only occasion in which a collision will take place is when
+ %% all 4 blocks are the same and the counter is the same.
+ %%
+ %% The range (2^32) is provided explicitly since phash uses 2^27
+ %% by default.
+ B5 = erlang:phash2({B1, I}, 4294967296),
+ {{(B2 bxor B5), (B3 bxor B5), (B4 bxor B5), B5}, I+1}.
+
+blocks_to_binary({B1, B2, B3, B4}) -> <<B1:32, B2:32, B3:32, B4:32>>.
+
+%% generate a GUID. This function should be used when performance is a
+%% priority and predictability is not an issue. Otherwise use
+%% gen_secure/0.
+gen() ->
+ %% We hash a fresh GUID with md5, split it in 4 blocks, and each
+ %% time we need a new guid we rotate them producing a new hash
+ %% with the aid of the counter. Look at the comments in
+ %% advance_blocks/2 for details.
+ {BS, I} = case get(guid) of
+ undefined -> <<B1:32, B2:32, B3:32, B4:32>> =
+ erlang:md5(term_to_binary(fresh())),
+ {{B1,B2,B3,B4}, 0};
+ {BS0, I0} -> advance_blocks(BS0, I0)
+ end,
+ put(guid, {BS, I}),
+ blocks_to_binary(BS).
+
+%% generate a non-predictable GUID.
+%%
+%% The id is only unique within a single cluster and as long as the
+%% serial store hasn't been deleted.
+%%
+%% If you are not concerned with predictability, gen/0 is faster.
+gen_secure() ->
+ %% Here instead of hashing once we hash the GUID and the counter
+ %% each time, so that the GUID is not predictable.
+ G = case get(guid_secure) of
+ undefined -> {fresh(), 0};
{S, I} -> {S, I+1}
end,
- put(guid, G),
+ put(guid_secure, G),
erlang:md5(term_to_binary(G)).
%% generate a readable string representation of a GUID.
%%
%% employs base64url encoding, which is safer in more contexts than
%% plain base64.
-string_guid(Prefix) ->
+string(G, Prefix) ->
Prefix ++ "-" ++ lists:foldl(fun ($\+, Acc) -> [$\- | Acc];
($\/, Acc) -> [$\_ | Acc];
($\=, Acc) -> Acc;
(Chr, Acc) -> [Chr | Acc]
- end, [], base64:encode_to_string(guid())).
+ end, [], base64:encode_to_string(G)).
-binstring_guid(Prefix) ->
- list_to_binary(string_guid(Prefix)).
+binary(G, Prefix) ->
+ list_to_binary(string(G, Prefix)).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 177ae868..80b4e768 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_heartbeat).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 8a08d4b6..9fa6213b 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_limiter).
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 558e0957..a6b4eeb0 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_log).
@@ -23,8 +23,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([debug/1, debug/2, message/4, info/1, info/2,
- warning/1, warning/2, error/1, error/2]).
+-export([log/3, log/4, info/1, info/2, warning/1, warning/2, error/1, error/2]).
-define(SERVER, ?MODULE).
@@ -32,9 +31,15 @@
-ifdef(use_specs).
+-export_type([level/0]).
+
+-type(category() :: atom()).
+-type(level() :: 'info' | 'warning' | 'error').
+
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(debug/1 :: (string()) -> 'ok').
--spec(debug/2 :: (string(), [any()]) -> 'ok').
+
+-spec(log/3 :: (category(), level(), string()) -> 'ok').
+-spec(log/4 :: (category(), level(), string(), [any()]) -> 'ok').
-spec(info/1 :: (string()) -> 'ok').
-spec(info/2 :: (string(), [any()]) -> 'ok').
-spec(warning/1 :: (string()) -> 'ok').
@@ -42,84 +47,47 @@
-spec(error/1 :: (string()) -> 'ok').
-spec(error/2 :: (string(), [any()]) -> 'ok').
--spec(message/4 :: (_,_,_,_) -> 'ok').
-
-endif.
%%----------------------------------------------------------------------------
-
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+log(Category, Level, Fmt) -> log(Category, Level, Fmt, []).
-debug(Fmt) ->
- gen_server:cast(?SERVER, {debug, Fmt}).
-
-debug(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {debug, Fmt, Args}).
-
-message(Direction, Channel, MethodRecord, Content) ->
- gen_server:cast(?SERVER,
- {message, Direction, Channel, MethodRecord, Content}).
+log(Category, Level, Fmt, Args) when is_list(Args) ->
+ gen_server:cast(?SERVER, {log, Category, Level, Fmt, Args}).
-info(Fmt) ->
- gen_server:cast(?SERVER, {info, Fmt}).
-
-info(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {info, Fmt, Args}).
-
-warning(Fmt) ->
- gen_server:cast(?SERVER, {warning, Fmt}).
-
-warning(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {warning, Fmt, Args}).
-
-error(Fmt) ->
- gen_server:cast(?SERVER, {error, Fmt}).
-
-error(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {error, Fmt, Args}).
+info(Fmt) -> log(default, info, Fmt).
+info(Fmt, Args) -> log(default, info, Fmt, Args).
+warning(Fmt) -> log(default, warning, Fmt).
+warning(Fmt, Args) -> log(default, warning, Fmt, Args).
+error(Fmt) -> log(default, error, Fmt).
+error(Fmt, Args) -> log(default, error, Fmt, Args).
%%--------------------------------------------------------------------
-init([]) -> {ok, none}.
+init([]) ->
+ {ok, CatLevelList} = application:get_env(log_levels),
+ CatLevels = [{Cat, level(Level)} || {Cat, Level} <- CatLevelList],
+ {ok, orddict:from_list(CatLevels)}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast({debug, Fmt}, State) ->
- io:format("debug:: "), io:format(Fmt),
- error_logger:info_msg("debug:: " ++ Fmt),
- {noreply, State};
-handle_cast({debug, Fmt, Args}, State) ->
- io:format("debug:: "), io:format(Fmt, Args),
- error_logger:info_msg("debug:: " ++ Fmt, Args),
- {noreply, State};
-handle_cast({message, Direction, Channel, MethodRecord, Content}, State) ->
- io:format("~s ch~p ~p~n",
- [case Direction of
- in -> "-->";
- out -> "<--" end,
- Channel,
- {MethodRecord, Content}]),
- {noreply, State};
-handle_cast({info, Fmt}, State) ->
- error_logger:info_msg(Fmt),
- {noreply, State};
-handle_cast({info, Fmt, Args}, State) ->
- error_logger:info_msg(Fmt, Args),
- {noreply, State};
-handle_cast({warning, Fmt}, State) ->
- error_logger:warning_msg(Fmt),
- {noreply, State};
-handle_cast({warning, Fmt, Args}, State) ->
- error_logger:warning_msg(Fmt, Args),
- {noreply, State};
-handle_cast({error, Fmt}, State) ->
- error_logger:error_msg(Fmt),
- {noreply, State};
-handle_cast({error, Fmt, Args}, State) ->
- error_logger:error_msg(Fmt, Args),
- {noreply, State};
+handle_cast({log, Category, Level, Fmt, Args}, CatLevels) ->
+ CatLevel = case orddict:find(Category, CatLevels) of
+ {ok, L} -> L;
+ error -> level(info)
+ end,
+ case level(Level) =< CatLevel of
+ false -> ok;
+ true -> (case Level of
+ info -> fun error_logger:info_msg/2;
+ warning -> fun error_logger:warning_msg/2;
+ error -> fun error_logger:error_msg/2
+ end)(Fmt, Args)
+ end,
+ {noreply, CatLevels};
handle_cast(_Msg, State) ->
{noreply, State}.
@@ -132,3 +100,9 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+%%--------------------------------------------------------------------
+
+level(info) -> 3;
+level(warning) -> 2;
+level(error) -> 1;
+level(none) -> 0.
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index c25a177b..f22ad874 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 8ed2bede..d0b5bab7 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_coordinator).
@@ -325,8 +325,7 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) ->
true = link(GM),
GM
end,
- {ok, _TRef} =
- timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]),
+ ensure_gm_heartbeat(),
{ok, #state { q = Q,
gm = GM1,
monitors = dict:new(),
@@ -366,6 +365,11 @@ handle_cast({ensure_monitoring, Pids},
end, Monitors, Pids),
noreply(State #state { monitors = Monitors1 }).
+handle_info(send_gm_heartbeat, State = #state{gm = GM}) ->
+ gm:broadcast(GM, heartbeat),
+ ensure_gm_heartbeat(),
+ noreply(State);
+
handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
State = #state { monitors = Monitors,
death_fun = DeathFun }) ->
@@ -419,3 +423,6 @@ noreply(State) ->
reply(Reply, State) ->
{reply, Reply, State, hibernate}.
+
+ensure_gm_heartbeat() ->
+ erlang:send_after(?ONE_SECOND, self(), send_gm_heartbeat).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index f60562ef..64a4a737 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_master).
@@ -280,8 +280,10 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }.
-status(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
- BQ:status(BQS).
+status(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:status(BQS) ++
+ [ {mirror_seen, dict:size(State #state.seen_status)},
+ {mirror_senders, sets:size(State #state.known_senders)} ].
invoke(?MODULE, Fun, State) ->
Fun(?MODULE, State);
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index baebc52b..db7d8ecc 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_misc).
@@ -136,12 +136,16 @@ add_mirror(Queue, MirrorNode) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
[] -> Result = rabbit_mirror_queue_slave_sup:start_child(
MirrorNode, [Q]),
- rabbit_log:info(
- "Adding mirror of queue ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, Result]),
case Result of
- {ok, _Pid} -> ok;
- _ -> Result
+ {ok, undefined} -> %% Already running
+ ok;
+ {ok, _Pid} ->
+ rabbit_log:info(
+ "Adding mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, Result]),
+ ok;
+ _ ->
+ Result
end;
[_] -> {error, {queue_already_mirrored_on_node, MirrorNode}}
end
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 8d69a108..9bf89bce 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_slave).
@@ -90,7 +90,7 @@
}).
start_link(Q) ->
- gen_server2:start_link(?MODULE, [Q], []).
+ gen_server2:start_link(?MODULE, Q, []).
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
@@ -98,55 +98,61 @@ set_maximum_since_use(QPid, Age) ->
info(QPid) ->
gen_server2:call(QPid, info, infinity).
-init([#amqqueue { name = QueueName } = Q]) ->
- process_flag(trap_exit, true), %% amqqueue_process traps exits too.
- {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
- receive {joined, GM} ->
- ok
- end,
+init(#amqqueue { name = QueueName } = Q) ->
Self = self(),
Node = node(),
- {ok, MPid} =
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
- mnesia:read({rabbit_queue, QueueName}),
- %% ASSERTION
- [] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node],
- MPids1 = MPids ++ [Self],
- ok = rabbit_amqqueue:store_queue(
- Q1 #amqqueue { slave_pids = MPids1 }),
- {ok, QPid}
- end),
- erlang:monitor(process, MPid),
- ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use, [Self]),
- ok = rabbit_memory_monitor:register(
- Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
- {ok, BQ} = application:get_env(backing_queue_module),
- BQS = bq_init(BQ, Q, false),
- State = #state { q = Q,
- gm = GM,
- master_pid = MPid,
- backing_queue = BQ,
- backing_queue_state = BQS,
- rate_timer_ref = undefined,
- sync_timer_ref = undefined,
-
- sender_queues = dict:new(),
- msg_id_ack = dict:new(),
- ack_num = 0,
-
- msg_id_status = dict:new(),
- known_senders = dict:new(),
-
- synchronised = false
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
+ mnesia:read({rabbit_queue, QueueName}),
+ case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
+ [] -> MPids1 = MPids ++ [Self],
+ ok = rabbit_amqqueue:store_queue(
+ Q1 #amqqueue { slave_pids = MPids1 }),
+ {new, QPid};
+ [SPid] -> true = rabbit_misc:is_process_alive(SPid),
+ existing
+ end
+ end) of
+ {new, MPid} ->
+ process_flag(trap_exit, true), %% amqqueue_process traps exits too.
+ {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
+ receive {joined, GM} ->
+ ok
+ end,
+ erlang:monitor(process, MPid),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [Self]),
+ ok = rabbit_memory_monitor:register(
+ Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
+ {ok, BQ} = application:get_env(backing_queue_module),
+ BQS = bq_init(BQ, Q, false),
+ State = #state { q = Q,
+ gm = GM,
+ master_pid = MPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = undefined,
+ sync_timer_ref = undefined,
+
+ sender_queues = dict:new(),
+ msg_id_ack = dict:new(),
+ ack_num = 0,
+
+ msg_id_status = dict:new(),
+ known_senders = dict:new(),
+
+ synchronised = false
},
- rabbit_event:notify(queue_slave_created,
- infos(?CREATION_EVENT_KEYS, State)),
- ok = gm:broadcast(GM, request_length),
- {ok, State, hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+ rabbit_event:notify(queue_slave_created,
+ infos(?CREATION_EVENT_KEYS, State)),
+ ok = gm:broadcast(GM, request_length),
+ {ok, State, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
+ ?DESIRED_HIBERNATE}};
+ existing ->
+ ignore
+ end.
handle_call({deliver, Delivery = #delivery { immediate = true }},
From, State) ->
@@ -207,8 +213,12 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({gm, Instruction}, State) ->
handle_process_result(process_instruction(Instruction, State));
-handle_cast({deliver, Delivery = #delivery {}}, State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ case Flow of
+ flow -> credit_flow:ack(Sender);
+ noflow -> ok
+ end,
noreply(maybe_enqueue_message(Delivery, true, State));
handle_cast({set_maximum_since_use, Age}, State) ->
@@ -249,6 +259,10 @@ handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
+handle_info({bump_credit, Msg}, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ noreply(State);
+
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@@ -446,7 +460,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% Everything that we're monitoring, we need to ensure our new
%% coordinator is monitoring.
- MonitoringPids = [begin true = erlang:demonitor(MRef),
+ MonitoringPids = [begin put({ch_publisher, Pid}, MRef),
Pid
end || {Pid, MRef} <- dict:to_list(KS)],
ok = rabbit_mirror_queue_coordinator:ensure_monitoring(
@@ -600,7 +614,8 @@ ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
local_sender_death(ChPid, State = #state { known_senders = KS }) ->
ok = case dict:is_key(ChPid, KS) of
false -> ok;
- true -> confirm_sender_death(ChPid)
+ true -> credit_flow:peer_down(ChPid),
+ confirm_sender_death(ChPid)
end,
State.
diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl
index fc04ec79..8eacb1f3 100644
--- a/src/rabbit_mirror_queue_slave_sup.erl
+++ b/src/rabbit_mirror_queue_slave_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2010-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2010-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_mirror_queue_slave_sup).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 0578cf7d..b6d38172 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_misc).
@@ -34,11 +34,11 @@
-export([execute_mnesia_transaction/2]).
-export([execute_mnesia_tx_with_tail/1]).
-export([ensure_ok/2]).
--export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
+-export([tcp_name/3]).
-export([upmap/2, map_in_order/2]).
-export([table_filter/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
--export([format_stderr/2, with_local_io/1, local_info_msg/2]).
+-export([format/2, format_stderr/2, with_local_io/1, local_info_msg/2]).
-export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
@@ -141,9 +141,6 @@
-spec(execute_mnesia_tx_with_tail/1 ::
(thunk(fun ((boolean()) -> B))) -> B | (fun ((boolean()) -> B))).
-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
--spec(makenode/1 :: ({string(), string()} | string()) -> node()).
--spec(nodeparts/1 :: (node() | string()) -> {string(), string()}).
--spec(cookie_hash/0 :: () -> string()).
-spec(tcp_name/3 ::
(atom(), inet:ip_address(), rabbit_networking:ip_port())
-> atom()).
@@ -155,6 +152,7 @@
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom())
-> 'ok' | 'aborted').
-spec(dirty_dump_log/1 :: (file:filename()) -> ok_or_error()).
+-spec(format/2 :: (string(), [any()]) -> string()).
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(with_local_io/1 :: (fun (() -> A)) -> A).
-spec(local_info_msg/2 :: (string(), [any()]) -> 'ok').
@@ -222,7 +220,7 @@ frame_error(MethodName, BinaryFields) ->
protocol_error(frame_error, "cannot decode ~w", [BinaryFields], MethodName).
amqp_error(Name, ExplanationFormat, Params, Method) ->
- Explanation = lists:flatten(io_lib:format(ExplanationFormat, Params)),
+ Explanation = format(ExplanationFormat, Params),
#amqp_error{name = Name, explanation = Explanation, method = Method}.
protocol_error(Name, ExplanationFormat, Params) ->
@@ -276,8 +274,7 @@ val({Type, Value}) ->
true -> "~s";
false -> "~w"
end,
- lists:flatten(io_lib:format("the value '" ++ ValFmt ++ "' of type '~s'",
- [Value, Type])).
+ format("the value '" ++ ValFmt ++ "' of type '~s'", [Value, Type]).
%% Normally we'd call mnesia:dirty_read/1 here, but that is quite
%% expensive due to general mnesia overheads (figuring out table types
@@ -320,8 +317,7 @@ r_arg(VHostPath, Kind, Table, Key) ->
end.
rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) ->
- lists:flatten(io_lib:format("~s '~s' in vhost '~s'",
- [Kind, Name, VHostPath])).
+ format("~s '~s' in vhost '~s'", [Kind, Name, VHostPath]).
enable_cover() -> enable_cover(["."]).
@@ -337,7 +333,7 @@ enable_cover(Dirs) ->
end, ok, Dirs).
start_cover(NodesS) ->
- {ok, _} = cover:start([makenode(N) || N <- NodesS]),
+ {ok, _} = cover:start([rabbit_nodes:make(N) || N <- NodesS]),
ok.
report_cover() -> report_cover(["."]).
@@ -418,12 +414,25 @@ execute_mnesia_transaction(TxFun) ->
%% Making this a sync_transaction allows us to use dirty_read
%% elsewhere and get a consistent result even when that read
%% executes on a different node.
- case worker_pool:submit({mnesia, sync_transaction, [TxFun]}) of
- {atomic, Result} -> Result;
- {aborted, Reason} -> throw({error, Reason})
+ case worker_pool:submit(
+ fun () ->
+ case mnesia:is_transaction() of
+ false -> DiskLogBefore = mnesia_dumper:get_log_writes(),
+ Res = mnesia:sync_transaction(TxFun),
+ DiskLogAfter = mnesia_dumper:get_log_writes(),
+ case DiskLogAfter == DiskLogBefore of
+ true -> Res;
+ false -> {sync, Res}
+ end;
+ true -> mnesia:sync_transaction(TxFun)
+ end
+ end) of
+ {sync, {atomic, Result}} -> mnesia_sync:sync(), Result;
+ {sync, {aborted, Reason}} -> throw({error, Reason});
+ {atomic, Result} -> Result;
+ {aborted, Reason} -> throw({error, Reason})
end.
-
%% Like execute_mnesia_transaction/1 with additional Pre- and Post-
%% commit function
execute_mnesia_transaction(TxFun, PrePostCommitFun) ->
@@ -450,29 +459,10 @@ execute_mnesia_tx_with_tail(TxFun) ->
ensure_ok(ok, _) -> ok;
ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}).
-makenode({Prefix, Suffix}) ->
- list_to_atom(lists:append([Prefix, "@", Suffix]));
-makenode(NodeStr) ->
- makenode(nodeparts(NodeStr)).
-
-nodeparts(Node) when is_atom(Node) ->
- nodeparts(atom_to_list(Node));
-nodeparts(NodeStr) ->
- case lists:splitwith(fun (E) -> E =/= $@ end, NodeStr) of
- {Prefix, []} -> {_, Suffix} = nodeparts(node()),
- {Prefix, Suffix};
- {Prefix, Suffix} -> {Prefix, tl(Suffix)}
- end.
-
-cookie_hash() ->
- base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))).
-
tcp_name(Prefix, IPAddress, Port)
when is_atom(Prefix) andalso is_number(Port) ->
list_to_atom(
- lists:flatten(
- io_lib:format("~w_~s:~w",
- [Prefix, inet_parse:ntoa(IPAddress), Port]))).
+ format("~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port])).
%% This is a modified version of Luke Gorrie's pmap -
%% http://lukego.livejournal.com/6753.html - that doesn't care about
@@ -541,6 +531,8 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) ->
io:format("Bad Chunk, ~p: ~p~n", [BadBytes, Terms]),
dirty_dump_log1(LH, disk_log:chunk(LH, K)).
+format(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)).
+
format_stderr(Fmt, Args) ->
case os:type() of
{unix, _} ->
@@ -636,7 +628,7 @@ pid_to_string(Pid) when is_pid(Pid) ->
<<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>>
= term_to_binary(Pid),
Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
- lists:flatten(io_lib:format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser])).
+ format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser]).
%% inverse of above
string_to_pid(Str) ->
@@ -728,13 +720,14 @@ gb_trees_foreach(Fun, Tree) ->
%% [{"-q",true},{"-p","/"}]}
get_options(Defs, As) ->
lists:foldl(fun(Def, {AsIn, RsIn}) ->
- {AsOut, Value} = case Def of
- {flag, Key} ->
- get_flag(Key, AsIn);
- {option, Key, Default} ->
- get_option(Key, Default, AsIn)
- end,
- {AsOut, [{Key, Value} | RsIn]}
+ {K, {AsOut, V}} =
+ case Def of
+ {flag, Key} ->
+ {Key, get_flag(Key, AsIn)};
+ {option, Key, Default} ->
+ {Key, get_option(Key, Default, AsIn)}
+ end,
+ {AsOut, [{K, V} | RsIn]}
end, {As, []}, Defs).
get_option(K, _Default, [K, V | As]) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index bf997a6f..4d419fd9 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
@@ -23,8 +23,8 @@
empty_ram_only_tables/0, copy_db/1, wait_for_tables/1,
create_cluster_nodes_config/1, read_cluster_nodes_config/0,
record_running_nodes/0, read_previously_running_nodes/0,
- delete_previously_running_nodes/0, running_nodes_filename/0,
- is_disc_node/0, on_node_down/1, on_node_up/1]).
+ running_nodes_filename/0, is_disc_node/0, on_node_down/1,
+ on_node_up/1]).
-export([table_names/0]).
@@ -64,7 +64,6 @@
-spec(read_cluster_nodes_config/0 :: () -> [node()]).
-spec(record_running_nodes/0 :: () -> 'ok').
-spec(read_previously_running_nodes/0 :: () -> [node()]).
--spec(delete_previously_running_nodes/0 :: () -> 'ok').
-spec(running_nodes_filename/0 :: () -> file:filename()).
-spec(is_disc_node/0 :: () -> boolean()).
-spec(on_node_up/1 :: (node()) -> 'ok').
@@ -98,12 +97,13 @@ status() ->
init() ->
ensure_mnesia_running(),
ensure_mnesia_dir(),
- ok = init_db(read_cluster_nodes_config(), true,
- fun maybe_upgrade_local_or_record_desired/0),
+ Nodes = read_cluster_nodes_config(),
+ ok = init_db(Nodes, should_be_disc_node(Nodes)),
%% We intuitively expect the global name server to be synced when
%% Mnesia is up. In fact that's not guaranteed to be the case - let's
%% make it so.
ok = global:sync(),
+ ok = delete_previously_running_nodes(),
ok.
is_db_empty() ->
@@ -174,8 +174,7 @@ cluster(ClusterNodes, Force) ->
%% Join the cluster
start_mnesia(),
try
- ok = init_db(ClusterNodes, Force,
- fun maybe_upgrade_local_or_record_desired/0),
+ ok = init_db(ClusterNodes, Force),
ok = create_cluster_nodes_config(ClusterNodes)
after
stop_mnesia()
@@ -501,6 +500,18 @@ delete_previously_running_nodes() ->
FileName, Reason}})
end.
+init_db(ClusterNodes, Force) ->
+ init_db(
+ ClusterNodes, Force,
+ fun () ->
+ case rabbit_upgrade:maybe_upgrade_local() of
+ ok -> ok;
+ %% If we're just starting up a new node we won't have a
+ %% version
+ version_not_available -> ok = rabbit_version:record_desired()
+ end
+ end).
+
%% Take a cluster node config and create the right kind of node - a
%% standalone disk node, or disk or ram node connected to the
%% specified cluster nodes. If Force is false, don't allow
@@ -509,20 +520,12 @@ init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) ->
UClusterNodes = lists:usort(ClusterNodes),
ProperClusterNodes = UClusterNodes -- [node()],
case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of
+ {ok, []} when not Force andalso ProperClusterNodes =/= [] ->
+ throw({error, {failed_to_cluster_with, ProperClusterNodes,
+ "Mnesia could not connect to any disc nodes."}});
{ok, Nodes} ->
- case Force of
- false -> FailedClusterNodes = ProperClusterNodes -- Nodes,
- case FailedClusterNodes of
- [] -> ok;
- _ -> throw({error, {failed_to_cluster_with,
- FailedClusterNodes,
- "Mnesia could not connect "
- "to some nodes."}})
- end;
- true -> ok
- end,
- WantDiscNode = should_be_disc_node(ClusterNodes),
WasDiscNode = is_disc_node(),
+ WantDiscNode = should_be_disc_node(ClusterNodes),
%% We create a new db (on disk, or in ram) in the first
%% two cases and attempt to upgrade the in the other two
case {Nodes, WasDiscNode, WantDiscNode} of
@@ -572,14 +575,6 @@ init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) ->
throw({error, {unable_to_join_cluster, ClusterNodes, Reason}})
end.
-maybe_upgrade_local_or_record_desired() ->
- case rabbit_upgrade:maybe_upgrade_local() of
- ok -> ok;
- %% If we're just starting up a new node we won't have a
- %% version
- version_not_available -> ok = rabbit_version:record_desired()
- end.
-
schema_ok_or_move() ->
case check_schema_integrity() of
ok ->
@@ -627,10 +622,9 @@ move_db() ->
stop_mnesia(),
MnesiaDir = filename:dirname(dir() ++ "/"),
{{Year, Month, Day}, {Hour, Minute, Second}} = erlang:universaltime(),
- BackupDir = lists:flatten(
- io_lib:format("~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w",
- [MnesiaDir,
- Year, Month, Day, Hour, Minute, Second])),
+ BackupDir = rabbit_misc:format(
+ "~s_~w~2..0w~2..0w~2..0w~2..0w~2..0w",
+ [MnesiaDir, Year, Month, Day, Hour, Minute, Second]),
case file:rename(MnesiaDir, BackupDir) of
ok ->
%% NB: we cannot use rabbit_log here since it may not have
@@ -738,16 +732,18 @@ reset(Force) ->
false -> ok
end,
Node = node(),
+ Nodes = all_clustered_nodes() -- [Node],
case Force of
true -> ok;
false ->
ensure_mnesia_dir(),
start_mnesia(),
- {Nodes, RunningNodes} =
+ RunningNodes =
try
- ok = init(),
- {all_clustered_nodes() -- [Node],
- running_clustered_nodes() -- [Node]}
+ %% Force=true here so that reset still works when clustered
+ %% with a node which is down
+ ok = init_db(read_cluster_nodes_config(), true),
+ running_clustered_nodes() -- [Node]
after
stop_mnesia()
end,
@@ -755,6 +751,10 @@ reset(Force) ->
rabbit_misc:ensure_ok(mnesia:delete_schema([Node]),
cannot_delete_schema)
end,
+ %% We need to make sure that we don't end up in a distributed
+ %% Erlang system with nodes while not being in an Mnesia cluster
+ %% with them. We don't handle that well.
+ [erlang:disconnect_node(N) || N <- Nodes],
ok = delete_cluster_nodes_config(),
%% remove persisted messages and any other garbage we find
ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index b7de27d4..f685b109 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_file).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e6a32b90..56265136 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store).
@@ -21,7 +21,7 @@
-export([start_link/4, successfully_recovered_state/1,
client_init/4, client_terminate/1, client_delete_and_terminate/1,
client_ref/1, close_all_indicated/1,
- write/3, read/2, contains/2, remove/2]).
+ write/3, write_flow/3, read/2, contains/2, remove/2]).
-export([set_maximum_since_use/2, has_readers/2, combine_files/3,
delete_file/2]). %% internal
@@ -152,6 +152,7 @@
-spec(close_all_indicated/1 ::
(client_msstate()) -> rabbit_types:ok(client_msstate())).
-spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok').
+-spec(write_flow/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok').
-spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) ->
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
-spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()).
@@ -436,7 +437,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} =
gen_server2:call(
- Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity),
+ Server, {new_client_state, Ref, self(), MsgOnDiskFun, CloseFDsFun},
+ infinity),
#client_msstate { server = Server,
client_ref = Ref,
file_handle_cache = dict:new(),
@@ -460,12 +462,11 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
-write(MsgId, Msg,
- CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
- client_ref = CRef }) ->
- ok = client_update_flying(+1, MsgId, CState),
- ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
- ok = server_cast(CState, {write, CRef, MsgId}).
+write_flow(MsgId, Msg, CState = #client_msstate { server = Server }) ->
+ credit_flow:send(whereis(Server), ?CREDIT_DISC_BOUND),
+ client_write(MsgId, Msg, flow, CState).
+
+write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
read(MsgId,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
@@ -500,6 +501,13 @@ server_call(#client_msstate { server = Server }, Msg) ->
server_cast(#client_msstate { server = Server }, Msg) ->
gen_server2:cast(Server, Msg).
+client_write(MsgId, Msg, Flow,
+ CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
+ client_ref = CRef }) ->
+ ok = client_update_flying(+1, MsgId, CState),
+ ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
+ ok = server_cast(CState, {write, CRef, MsgId, Flow}).
+
client_read1(#msg_location { msg_id = MsgId, file = File } = MsgLocation, Defer,
CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
case ets:lookup(FileSummaryEts, File) of
@@ -666,7 +674,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
ClientRefs, Dir, Server),
Clients = dict:from_list(
- [{CRef, {undefined, undefined}} || CRef <- ClientRefs1]),
+ [{CRef, {undefined, undefined, undefined}} ||
+ CRef <- ClientRefs1]),
%% CleanShutdown => msg location index and file_summary both
%% recovered correctly.
true = case {FileSummaryRecovered, CleanShutdown} of
@@ -731,10 +740,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- successfully_recovered_state -> 7;
- {new_client_state, _Ref, _MODC, _CloseFDsFun} -> 7;
- {read, _MsgId} -> 2;
- _ -> 0
+ successfully_recovered_state -> 7;
+ {new_client_state, _Ref, _Pid, _MODC, _CloseFDsFun} -> 7;
+ {read, _MsgId} -> 2;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
@@ -755,7 +764,7 @@ prioritise_info(Msg, _State) ->
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
-handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
+handle_call({new_client_state, CRef, CPid, MsgOnDiskFun, CloseFDsFun}, _From,
State = #msstate { dir = Dir,
index_state = IndexState,
index_module = IndexModule,
@@ -765,7 +774,7 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
flying_ets = FlyingEts,
clients = Clients,
gc_pid = GCPid }) ->
- Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
+ Clients1 = dict:store(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients),
reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
CurFileCacheEts, FlyingEts},
State #msstate { clients = Clients1 });
@@ -789,11 +798,19 @@ handle_cast({client_dying, CRef},
handle_cast({client_delete, CRef},
State = #msstate { clients = Clients }) ->
+ {CPid, _, _} = dict:fetch(CRef, Clients),
+ credit_flow:peer_down(CPid),
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
-handle_cast({write, CRef, MsgId},
- State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+handle_cast({write, CRef, MsgId, Flow},
+ State = #msstate { cur_file_cache_ets = CurFileCacheEts,
+ clients = Clients }) ->
+ case Flow of
+ flow -> {CPid, _, _} = dict:fetch(CRef, Clients),
+ credit_flow:ack(CPid, ?CREDIT_DISC_BOUND);
+ noflow -> ok
+ end,
true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
case update_flying(-1, MsgId, CRef, State) of
process ->
@@ -1204,10 +1221,10 @@ update_pending_confirms(Fun, CRef,
State = #msstate { clients = Clients,
cref_to_msg_ids = CTM }) ->
case dict:fetch(CRef, Clients) of
- {undefined, _CloseFDsFun} -> State;
- {MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM),
- State #msstate {
- cref_to_msg_ids = CTM1 }
+ {_CPid, undefined, _CloseFDsFun} -> State;
+ {_CPid, MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM),
+ State #msstate {
+ cref_to_msg_ids = CTM1 }
end.
record_pending_confirm(CRef, MsgId, State) ->
@@ -1294,8 +1311,10 @@ mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) ->
case (ets:update_element(FileHandlesEts, Key, {2, close})
andalso Invoke) of
true -> case dict:fetch(Ref, ClientRefs) of
- {_MsgOnDiskFun, undefined} -> ok;
- {_MsgOnDiskFun, CloseFDsFun} -> ok = CloseFDsFun()
+ {_CPid, _MsgOnDiskFun, undefined} ->
+ ok;
+ {_CPid, _MsgOnDiskFun, CloseFDsFun} ->
+ ok = CloseFDsFun()
end;
false -> ok
end
diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl
index d6dc5568..9c31439f 100644
--- a/src/rabbit_msg_store_ets_index.erl
+++ b/src/rabbit_msg_store_ets_index.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store_ets_index).
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 77f1f04e..3b61ed0b 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store_gc).
diff --git a/src/rabbit_msg_store_index.erl b/src/rabbit_msg_store_index.erl
index ef8b7cdf..2f36256c 100644
--- a/src/rabbit_msg_store_index.erl
+++ b/src/rabbit_msg_store_index.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_msg_store_index).
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index b944ec81..02889b93 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_net).
@@ -19,7 +19,7 @@
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1,
- sockname/1, peername/1, peercert/1]).
+ sockname/1, peername/1, peercert/1, connection_string/2]).
%%---------------------------------------------------------------------------
@@ -62,6 +62,8 @@
-spec(peercert/1 ::
(socket())
-> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())).
+-spec(connection_string/2 ::
+ (socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())).
-endif.
@@ -141,3 +143,19 @@ peername(Sock) when is_port(Sock) -> inet:peername(Sock).
peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl);
peercert(Sock) when is_port(Sock) -> nossl.
+
+connection_string(Sock, Direction) ->
+ {From, To} = case Direction of
+ inbound -> {fun peername/1, fun sockname/1};
+ outbound -> {fun sockname/1, fun peername/1}
+ end,
+ case {From(Sock), To(Sock)} of
+ {{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} ->
+ {ok, rabbit_misc:format("~s:~p -> ~s:~p",
+ [rabbit_misc:ntoab(FromAddress), FromPort,
+ rabbit_misc:ntoab(ToAddress), ToPort])};
+ {{error, _Reason} = Error, _} ->
+ Error;
+ {_, {error, _Reason} = Error} ->
+ Error
+ end.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index e81f8134..825d1bb1 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_networking).
@@ -164,8 +164,6 @@ ssl_transform_fun(SslOpts) ->
fun (Sock) ->
case catch ssl:ssl_accept(Sock, SslOpts, ?SSL_TIMEOUT * 1000) of
{ok, SslSock} ->
- rabbit_log:info("upgraded TCP connection ~p to SSL~n",
- [self()]),
{ok, #ssl_socket{tcp = Sock, ssl = SslSock}};
{error, Reason} ->
{error, {ssl_upgrade_error, Reason}};
@@ -220,7 +218,12 @@ start_listener(Listener, Protocol, Label, OnConnect) ->
start_listener0(Address, Protocol, Label, OnConnect) ->
Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, tcp_opts(),
Protocol, Label, OnConnect),
- {ok,_} = supervisor:start_child(rabbit_sup, Spec).
+ case supervisor:start_child(rabbit_sup, Spec) of
+ {ok, _} -> ok;
+ {error, {shutdown, _}} -> {IPAddress, Port, _Family} = Address,
+ exit({could_not_start_tcp_listener,
+ {rabbit_misc:ntoa(IPAddress), Port}})
+ end.
stop_tcp_listener(Listener) ->
[stop_tcp_listener0(Address) ||
@@ -266,6 +269,16 @@ start_client(Sock, SockTransform) ->
{ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []),
ok = rabbit_net:controlling_process(Sock, Reader),
Reader ! {go, Sock, SockTransform},
+
+ %% In the event that somebody floods us with connections, the
+ %% reader processes can spew log events at error_logger faster
+ %% than it can keep up, causing its mailbox to grow unbounded
+ %% until we eat all the memory available and crash. So here is a
+ %% meaningless synchronous call to the underlying gen_event
+ %% mechanism. When it returns the mailbox is drained, and we
+ %% return to our caller to accept more connetions.
+ gen_event:which_handlers(error_logger),
+
Reader.
start_client(Sock) ->
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 8aa24ab5..323cf0ce 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_node_monitor).
@@ -55,29 +55,32 @@ notify_cluster() ->
{_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
end,
%% register other active rabbits with this rabbit
- [ rabbit_node_monitor:rabbit_running_on(N) || N <- Nodes ],
+ [ rabbit_running_on(N) || N <- Nodes ],
ok.
%%--------------------------------------------------------------------
init([]) ->
- {ok, no_state}.
+ {ok, ordsets:new()}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast({rabbit_running_on, Node}, State) ->
- rabbit_log:info("rabbit on ~p up~n", [Node]),
- erlang:monitor(process, {rabbit, Node}),
- ok = handle_live_rabbit(Node),
- {noreply, State};
+handle_cast({rabbit_running_on, Node}, Nodes) ->
+ case ordsets:is_element(Node, Nodes) of
+ true -> {noreply, Nodes};
+ false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
+ erlang:monitor(process, {rabbit, Node}),
+ ok = handle_live_rabbit(Node),
+ {noreply, ordsets:add_element(Node, Nodes)}
+ end;
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
- rabbit_log:info("node ~p lost 'rabbit'~n", [Node]),
+handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Nodes) ->
+ rabbit_log:info("rabbit on node ~p down~n", [Node]),
ok = handle_dead_rabbit(Node),
- {noreply, State};
+ {noreply, ordsets:del_element(Node, Nodes)};
handle_info(_Info, State) ->
{noreply, State}.
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
new file mode 100644
index 00000000..329c07dc
--- /dev/null
+++ b/src/rabbit_nodes.erl
@@ -0,0 +1,94 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_nodes).
+
+-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0]).
+
+-define(EPMD_TIMEOUT, 30000).
+
+%%----------------------------------------------------------------------------
+%% Specs
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(names/1 :: (string()) -> rabbit_types:ok_or_error2(
+ [{string(), integer()}], term())).
+-spec(diagnostics/1 :: ([node()]) -> string()).
+-spec(make/1 :: ({string(), string()} | string()) -> node()).
+-spec(parts/1 :: (node() | string()) -> {string(), string()}).
+-spec(cookie_hash/0 :: () -> string()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+names(Hostname) ->
+ Self = self(),
+ process_flag(trap_exit, true),
+ Pid = spawn_link(fun () -> Self ! {names, net_adm:names(Hostname)} end),
+ timer:exit_after(?EPMD_TIMEOUT, Pid, timeout),
+ Res = receive
+ {names, Names} -> Names;
+ {'EXIT', Pid, Reason} -> {error, Reason}
+ end,
+ process_flag(trap_exit, false),
+ Res.
+
+diagnostics(Nodes) ->
+ Hosts = lists:usort([element(2, parts(Node)) || Node <- Nodes]),
+ NodeDiags = [{"~nDIAGNOSTICS~n===========~n~n"
+ "nodes in question: ~p~n~n"
+ "hosts, their running nodes and ports:", [Nodes]}] ++
+ [diagnostics_host(Host) || Host <- Hosts] ++
+ diagnostics0(),
+ lists:flatten([io_lib:format(F ++ "~n", A) || NodeDiag <- NodeDiags,
+ {F, A} <- [NodeDiag]]).
+
+diagnostics0() ->
+ [{"~ncurrent node details:~n- node name: ~w", [node()]},
+ case init:get_argument(home) of
+ {ok, [[Home]]} -> {"- home dir: ~s", [Home]};
+ Other -> {"- no home dir: ~p", [Other]}
+ end,
+ {"- cookie hash: ~s", [cookie_hash()]}].
+
+diagnostics_host(Host) ->
+ case names(Host) of
+ {error, EpmdReason} ->
+ {"- unable to connect to epmd on ~s: ~w",
+ [Host, EpmdReason]};
+ {ok, NamePorts} ->
+ {"- ~s: ~p",
+ [Host, [{list_to_atom(Name), Port} ||
+ {Name, Port} <- NamePorts]]}
+ end.
+
+make({Prefix, Suffix}) -> list_to_atom(lists:append([Prefix, "@", Suffix]));
+make(NodeStr) -> make(parts(NodeStr)).
+
+parts(Node) when is_atom(Node) ->
+ parts(atom_to_list(Node));
+parts(NodeStr) ->
+ case lists:splitwith(fun (E) -> E =/= $@ end, NodeStr) of
+ {Prefix, []} -> {_, Suffix} = parts(node()),
+ {Prefix, Suffix};
+ {Prefix, Suffix} -> {Prefix, tl(Suffix)}
+ end.
+
+cookie_hash() ->
+ base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))).
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index de2ba8ad..7b85ab15 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_plugins).
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 50444dc4..162d44f1 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_prelaunch).
@@ -22,7 +22,6 @@
-define(BaseApps, [rabbit]).
-define(ERROR_CODE, 1).
--define(EPMD_TIMEOUT, 30000).
%%----------------------------------------------------------------------------
%% Specs
@@ -244,16 +243,15 @@ duplicate_node_check([]) ->
%% Ignore running node while installing windows service
ok;
duplicate_node_check(NodeStr) ->
- Node = rabbit_misc:makenode(NodeStr),
- {NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
- case names(NodeHost) of
+ Node = rabbit_nodes:make(NodeStr),
+ {NodeName, NodeHost} = rabbit_nodes:parts(Node),
+ case rabbit_nodes:names(NodeHost) of
{ok, NamePorts} ->
case proplists:is_defined(NodeName, NamePorts) of
true -> io:format("node with name ~p "
"already running on ~p~n",
[NodeName, NodeHost]),
- [io:format(Fmt ++ "~n", Args) ||
- {Fmt, Args} <- rabbit_control:diagnostics(Node)],
+ io:format(rabbit_nodes:diagnostics([Node]) ++ "~n"),
terminate(?ERROR_CODE);
false -> ok
end;
@@ -279,15 +277,3 @@ terminate(Status) ->
after infinity -> ok
end
end.
-
-names(Hostname) ->
- Self = self(),
- process_flag(trap_exit, true),
- Pid = spawn_link(fun () -> Self ! {names, net_adm:names(Hostname)} end),
- timer:exit_after(?EPMD_TIMEOUT, Pid, timeout),
- Res = receive
- {names, Names} -> Names;
- {'EXIT', Pid, Reason} -> {error, Reason}
- end,
- process_flag(trap_exit, false),
- Res.
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 9b45e798..df957d88 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_queue_collector).
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index f03c1d1c..4c8793f1 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_queue_index).
@@ -491,7 +491,7 @@ recover_message(false, _, no_del, RelSeq, Segment) ->
queue_name_to_dir_name(Name = #resource { kind = queue }) ->
<<Num:128>> = erlang:md5(term_to_binary(Name)),
- lists:flatten(io_lib:format("~.36B", [Num])).
+ rabbit_misc:format("~.36B", [Num]).
queues_dir() ->
filename:join(rabbit_mnesia:dir(), "queues").
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index fce61129..01242e81 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_reader).
@@ -27,8 +27,6 @@
-export([conserve_memory/2, server_properties/1]).
--export([process_channel_frame/5]). %% used by erlang-client
-
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 30).
@@ -40,10 +38,12 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
- auth_mechanism, auth_state}).
+ auth_mechanism, auth_state, conserve_memory,
+ last_blocked_by, last_blocked_at}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
- send_pend, state, channels]).
+ send_pend, state, last_blocked_by, last_blocked_age,
+ channels]).
-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl,
peer_cert_subject, peer_cert_issuer,
@@ -90,10 +90,6 @@
-spec(system_continue/3 :: (_,_,#v1{}) -> any()).
-spec(system_terminate/4 :: (_,_,_,_) -> none()).
--spec(process_channel_frame/5 ::
- (rabbit_command_assembler:frame(), pid(), non_neg_integer(), pid(),
- tuple()) -> tuple()).
-
-endif.
%%--------------------------------------------------------------------------
@@ -177,25 +173,26 @@ server_capabilities(rabbit_framing_amqp_0_9_1) ->
server_capabilities(_) ->
[].
+log(Level, Fmt, Args) -> rabbit_log:log(connection, Level, Fmt, Args).
+
inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
socket_op(Sock, Fun) ->
case Fun(Sock) of
{ok, Res} -> Res;
- {error, Reason} -> rabbit_log:error("error on TCP connection ~p:~p~n",
- [self(), Reason]),
- rabbit_log:info("closing TCP connection ~p~n",
- [self()]),
+ {error, Reason} -> log(error, "error on AMQP connection ~p: ~p~n",
+ [self(), Reason]),
exit(normal)
end.
start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
Sock, SockTransform) ->
process_flag(trap_exit, true),
- {PeerAddress, PeerPort} = socket_op(Sock, fun rabbit_net:peername/1),
- PeerAddressS = rabbit_misc:ntoab(PeerAddress),
- rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
- [self(), PeerAddressS, PeerPort]),
+ ConnStr = socket_op(Sock, fun (Sock0) ->
+ rabbit_net:connection_string(
+ Sock0, inbound)
+ end),
+ log(info, "accepting AMQP connection ~p (~s)~n", [self(), ConnStr]),
ClientSock = socket_op(Sock, SockTransform),
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
@@ -220,21 +217,22 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
buf = [],
buf_len = 0,
auth_mechanism = none,
- auth_state = none},
+ auth_state = none,
+ conserve_memory = false,
+ last_blocked_by = none,
+ last_blocked_at = never},
try
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
- handshake, 8))
+ handshake, 8)),
+ log(info, "closing AMQP connection ~p (~s)~n", [self(), ConnStr])
catch
- Ex -> (if Ex == connection_closed_abruptly ->
- fun rabbit_log:warning/2;
- true ->
- fun rabbit_log:error/2
- end)("exception on TCP connection ~p from ~s:~p~n~p~n",
- [self(), PeerAddressS, PeerPort, Ex])
+ Ex -> log(case Ex of
+ connection_closed_abruptly -> warning;
+ _ -> error
+ end, "closing AMQP connection ~p (~s):~n~p~n",
+ [self(), ConnStr, Ex])
after
- rabbit_log:info("closing TCP connection ~p from ~s:~p~n",
- [self(), PeerAddressS, PeerPort]),
%% We don't close the socket explicitly. The reader is the
%% controlling process and hence its termination will close
%% the socket. Furthermore, gen_tcp:close/1 waits for pending
@@ -267,21 +265,20 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
{data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf],
buf_len = BufLen + size(Data),
pending_recv = false});
- closed -> if State#v1.connection_state =:= closed ->
- State;
- true ->
- throw(connection_closed_abruptly)
+ closed -> case State#v1.connection_state of
+ closed -> State;
+ _ -> throw(connection_closed_abruptly)
end;
{error, Reason} -> throw({inet_error, Reason});
{other, Other} -> handle_other(Other, Deb, State)
end.
handle_other({conserve_memory, Conserve}, Deb, State) ->
- recvloop(Deb, internal_conserve_memory(Conserve, State));
+ recvloop(Deb, control_throttle(State#v1{conserve_memory = Conserve}));
handle_other({channel_closing, ChPid}, Deb, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
- mainloop(Deb, maybe_close(State));
+ mainloop(Deb, maybe_close(control_throttle(State)));
handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -341,6 +338,9 @@ handle_other(emit_stats, Deb, State) ->
mainloop(Deb, emit_stats(State));
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
+handle_other({bump_credit, Msg}, Deb, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ recvloop(Deb, control_throttle(State));
handle_other(Other, _Deb, _State) ->
%% internal error -> something worth dying for
exit({unexpected_message, Other}).
@@ -355,17 +355,30 @@ terminate(Explanation, State) when ?IS_RUNNING(State) ->
terminate(_Explanation, State) ->
{force, State}.
-internal_conserve_memory(true, State = #v1{connection_state = running}) ->
- State#v1{connection_state = blocking};
-internal_conserve_memory(false, State = #v1{connection_state = blocking}) ->
- State#v1{connection_state = running};
-internal_conserve_memory(false, State = #v1{connection_state = blocked,
- heartbeater = Heartbeater}) ->
- ok = rabbit_heartbeat:resume_monitor(Heartbeater),
- State#v1{connection_state = running};
-internal_conserve_memory(_Conserve, State) ->
+control_throttle(State = #v1{connection_state = CS,
+ conserve_memory = Mem}) ->
+ case {CS, Mem orelse credit_flow:blocked()} of
+ {running, true} -> State#v1{connection_state = blocking};
+ {blocking, false} -> State#v1{connection_state = running};
+ {blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
+ State#v1.heartbeater),
+ State#v1{connection_state = running};
+ {blocked, true} -> update_last_blocked_by(State);
+ {_, _} -> State
+ end.
+
+maybe_block(State = #v1{connection_state = blocking}) ->
+ ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
+ update_last_blocked_by(State#v1{connection_state = blocked,
+ last_blocked_at = erlang:now()});
+maybe_block(State) ->
State.
+update_last_blocked_by(State = #v1{conserve_memory = true}) ->
+ State#v1{last_blocked_by = mem};
+update_last_blocked_by(State = #v1{conserve_memory = false}) ->
+ State#v1{last_blocked_by = flow}.
+
close_connection(State = #v1{queue_collector = Collector,
connection = #connection{
timeout_sec = TimeoutSec}}) ->
@@ -387,17 +400,19 @@ handle_dependent_exit(ChPid, Reason, State) ->
{undefined, uncontrolled} ->
exit({abnormal_dependent_exit, ChPid, Reason});
{_Channel, controlled} ->
- maybe_close(State);
+ maybe_close(control_throttle(State));
{Channel, uncontrolled} ->
- rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
- [self(), Channel, Reason]),
- maybe_close(handle_exception(State, Channel, Reason))
+ log(error, "AMQP connection ~p, channel ~p - error:~n~p~n",
+ [self(), Channel, Reason]),
+ maybe_close(handle_exception(control_throttle(State),
+ Channel, Reason))
end.
channel_cleanup(ChPid) ->
case get({ch_pid, ChPid}) of
undefined -> undefined;
- {Channel, MRef} -> erase({channel, Channel}),
+ {Channel, MRef} -> credit_flow:peer_down(ChPid),
+ erase({channel, Channel}),
erase({ch_pid, ChPid}),
erlang:demonitor(MRef, [flush]),
Channel
@@ -432,9 +447,10 @@ wait_for_channel_termination(N, TimerRef) ->
{_Channel, controlled} ->
wait_for_channel_termination(N-1, TimerRef);
{Channel, uncontrolled} ->
- rabbit_log:error("connection ~p, channel ~p - "
- "error while terminating:~n~p~n",
- [self(), Channel, Reason]),
+ log(error,
+ "AMQP connection ~p, channel ~p - "
+ "error while terminating:~n~p~n",
+ [self(), Channel, Reason]),
wait_for_channel_termination(N-1, TimerRef)
end;
cancel_wait ->
@@ -483,43 +499,38 @@ handle_frame(Type, Channel, Payload,
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- AnalyzedFrame ->
- case get({channel, Channel}) of
- {ChPid, FramingState} ->
- NewAState = process_channel_frame(
- AnalyzedFrame, self(),
- Channel, ChPid, FramingState),
- put({channel, Channel}, {ChPid, NewAState}),
- post_process_frame(AnalyzedFrame, ChPid, State);
- undefined ->
- case ?IS_RUNNING(State) of
- true -> send_to_new_channel(
- Channel, AnalyzedFrame, State);
- false -> throw({channel_frame_while_starting,
- Channel, State#v1.connection_state,
- AnalyzedFrame})
- end
- end
+ AnalyzedFrame -> process_frame(AnalyzedFrame, Channel, State)
+ end.
+
+process_frame(Frame, Channel, State) ->
+ case get({channel, Channel}) of
+ {ChPid, AState} ->
+ case process_channel_frame(Frame, ChPid, AState) of
+ {ok, NewAState} -> put({channel, Channel}, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {error, Reason} -> handle_exception(State, Channel, Reason)
+ end;
+ undefined when ?IS_RUNNING(State) ->
+ ok = create_channel(Channel, State),
+ process_frame(Frame, Channel, State);
+ undefined ->
+ throw({channel_frame_while_starting,
+ Channel, State#v1.connection_state, Frame})
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
channel_cleanup(ChPid),
- State;
+ control_throttle(State);
post_process_frame({method, MethodName, _}, _ChPid,
State = #v1{connection = #connection{
protocol = Protocol}}) ->
case Protocol:method_has_content(MethodName) of
true -> erlang:bump_reductions(2000),
- case State#v1.connection_state of
- blocking -> ok = rabbit_heartbeat:pause_monitor(
- State#v1.heartbeater),
- State#v1{connection_state = blocked};
- _ -> State
- end;
- false -> State
+ maybe_block(control_throttle(State));
+ false -> control_throttle(State)
end;
post_process_frame(_Frame, _ChPid, State) ->
- State.
+ control_throttle(State).
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
ensure_stats_timer(
@@ -687,10 +698,11 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
- State1 = internal_conserve_memory(
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ State1 = control_throttle(
State#v1{connection_state = running,
- connection = NewConnection}),
+ connection = NewConnection,
+ conserve_memory = Conserve}),
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
@@ -822,6 +834,12 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
fun ([{_, I}]) -> I end);
i(state, #v1{connection_state = S}) ->
S;
+i(last_blocked_by, #v1{last_blocked_by = By}) ->
+ By;
+i(last_blocked_age, #v1{last_blocked_at = never}) ->
+ infinity;
+i(last_blocked_age, #v1{last_blocked_at = T}) ->
+ timer:now_diff(erlang:now(), T) / 1000000;
i(channels, #v1{}) ->
length(all_channels());
i(protocol, #v1{connection = #connection{protocol = none}}) ->
@@ -877,7 +895,7 @@ cert_info(F, Sock) ->
%%--------------------------------------------------------------------------
-send_to_new_channel(Channel, AnalyzedFrame, State) ->
+create_channel(Channel, State) ->
#v1{sock = Sock, queue_collector = Collector,
channel_sup_sup_pid = ChanSupSup,
connection = #connection{protocol = Protocol,
@@ -890,23 +908,19 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User,
VHost, Capabilities, Collector}),
MRef = erlang:monitor(process, ChPid),
- NewAState = process_channel_frame(AnalyzedFrame, self(),
- Channel, ChPid, AState),
- put({channel, Channel}, {ChPid, NewAState}),
put({ch_pid, ChPid}, {Channel, MRef}),
- State.
+ put({channel, Channel}, {ChPid, AState}),
+ ok.
-process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) ->
+process_channel_frame(Frame, ChPid, AState) ->
case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} -> NewAState;
+ {ok, NewAState} -> {ok, NewAState};
{ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
- NewAState;
- {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid,
- Method, Content),
- NewAState;
- {error, Reason} -> ErrPid ! {channel_exit, Channel,
- Reason},
- AState
+ {ok, NewAState};
+ {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
+ ChPid, Method, Content),
+ {ok, NewAState};
+ {error, Reason} -> {error, Reason}
end.
handle_exception(State = #v1{connection_state = closed}, _Channel, _Reason) ->
diff --git a/src/rabbit_registry.erl b/src/rabbit_registry.erl
index 9821ae7b..8c0ebcbe 100644
--- a/src/rabbit_registry.erl
+++ b/src/rabbit_registry.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_registry).
diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl
index 1a08efed..237ab78c 100644
--- a/src/rabbit_restartable_sup.erl
+++ b/src/rabbit_restartable_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_restartable_sup).
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 219833b7..f4bbda0f 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_router).
diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl
index 963294d9..e8beecfe 100644
--- a/src/rabbit_sasl_report_file_h.erl
+++ b/src/rabbit_sasl_report_file_h.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_sasl_report_file_h).
diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl
index e524446e..3025d981 100644
--- a/src/rabbit_ssl.erl
+++ b/src/rabbit_ssl.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_ssl).
@@ -72,9 +72,8 @@ peer_cert_validity(Cert) ->
cert_info(fun(#'OTPCertificate' {
tbsCertificate = #'OTPTBSCertificate' {
validity = {'Validity', Start, End} }}) ->
- lists:flatten(
- io_lib:format("~s - ~s", [format_asn1_value(Start),
- format_asn1_value(End)]))
+ rabbit_misc:format("~s - ~s", [format_asn1_value(Start),
+ format_asn1_value(End)])
end, Cert).
%%--------------------------------------------------------------------------
@@ -155,8 +154,7 @@ escape_rdn_value([C | S], middle) when C < 32 ; C >= 126 ->
%% purposes it's handy to escape all non-printable chars. All non-ASCII
%% characters get converted to UTF-8 sequences and then escaped. We've
%% already got a UTF-8 sequence here, so just escape it.
- lists:flatten(io_lib:format("\\~2.16.0B", [C])) ++
- escape_rdn_value(S, middle);
+ rabbit_misc:format("\\~2.16.0B", [C]) ++ escape_rdn_value(S, middle);
escape_rdn_value([C | S], middle) ->
[C | escape_rdn_value(S, middle)].
diff --git a/src/rabbit_sup.erl b/src/rabbit_sup.erl
index 802ea5e2..0965e3b3 100644
--- a/src/rabbit_sup.erl
+++ b/src/rabbit_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_sup).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 9afb95b9..433ed9cb 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_tests).
@@ -59,7 +59,7 @@ all_tests() ->
passed.
maybe_run_cluster_dependent_tests() ->
- SecondaryNode = rabbit_misc:makenode("hare"),
+ SecondaryNode = rabbit_nodes:make("hare"),
case net_adm:ping(SecondaryNode) of
pong -> passed = run_cluster_dependent_tests(SecondaryNode);
@@ -71,10 +71,13 @@ maybe_run_cluster_dependent_tests() ->
run_cluster_dependent_tests(SecondaryNode) ->
SecondaryNodeS = atom_to_list(SecondaryNode),
+ cover:stop(SecondaryNode),
ok = control_action(stop_app, []),
ok = control_action(reset, []),
ok = control_action(cluster, [SecondaryNodeS]),
ok = control_action(start_app, []),
+ cover:start(SecondaryNode),
+ ok = control_action(start_app, SecondaryNode, [], []),
io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]),
passed = test_delegates_async(SecondaryNode),
@@ -859,7 +862,7 @@ test_cluster_management() ->
"invalid2@invalid"]),
ok = assert_ram_node(),
- SecondaryNode = rabbit_misc:makenode("hare"),
+ SecondaryNode = rabbit_nodes:make("hare"),
case net_adm:ping(SecondaryNode) of
pong -> passed = test_cluster_management2(SecondaryNode);
pang -> io:format("Skipping clustering tests with node ~p~n",
@@ -889,6 +892,14 @@ test_cluster_management2(SecondaryNode) ->
ok = control_action(stop_app, []),
ok = assert_ram_node(),
+ %% ram node will not start by itself
+ ok = control_action(stop_app, []),
+ ok = control_action(stop_app, SecondaryNode, [], []),
+ {error, _} = control_action(start_app, []),
+ ok = control_action(start_app, SecondaryNode, [], []),
+ ok = control_action(start_app, []),
+ ok = control_action(stop_app, []),
+
%% change cluster config while remaining in same cluster
ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]),
ok = control_action(start_app, []),
@@ -897,8 +908,7 @@ test_cluster_management2(SecondaryNode) ->
%% join non-existing cluster as a ram node
ok = control_action(force_cluster, ["invalid1@invalid",
"invalid2@invalid"]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
+ {error, _} = control_action(start_app, []),
ok = assert_ram_node(),
%% join empty cluster as a ram node (converts to disc)
@@ -953,7 +963,9 @@ test_cluster_management2(SecondaryNode) ->
ok = control_action(cluster, [SecondaryNodeS, NodeS]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
+ cover:stop(SecondaryNode),
ok = control_action(reset, []),
+ cover:start(SecondaryNode),
%% attempt to leave cluster when no other node is alive
ok = control_action(cluster, [SecondaryNodeS, NodeS]),
@@ -970,7 +982,15 @@ test_cluster_management2(SecondaryNode) ->
%% leave system clustered, with the secondary node as a ram node
ok = control_action(force_reset, []),
ok = control_action(start_app, []),
- ok = control_action(force_reset, SecondaryNode, [], []),
+ %% Yes, this is rather ugly. But since we're a clustered Mnesia
+ %% node and we're telling another clustered node to reset itself,
+ %% we will get disconnected half way through causing a
+ %% badrpc. This never happens in real life since rabbitmqctl is
+ %% not a clustered Mnesia node.
+ cover:stop(SecondaryNode),
+ {badrpc, nodedown} = control_action(force_reset, SecondaryNode, [], []),
+ pong = net_adm:ping(SecondaryNode),
+ cover:start(SecondaryNode),
ok = control_action(cluster, SecondaryNode, [NodeS], []),
ok = control_action(start_app, SecondaryNode, [], []),
@@ -1779,10 +1799,10 @@ test_msg_store() ->
restart_msg_store_empty(),
MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)],
{MsgIds1stHalf, MsgIds2ndHalf} = lists:split(length(MsgIds) div 2, MsgIds),
- Ref = rabbit_guid:guid(),
+ Ref = rabbit_guid:gen(),
{Cap, MSCState} = msg_store_client_init_capture(
?PERSISTENT_MSG_STORE, Ref),
- Ref2 = rabbit_guid:guid(),
+ Ref2 = rabbit_guid:gen(),
{Cap2, MSC2State} = msg_store_client_init_capture(
?PERSISTENT_MSG_STORE, Ref2),
%% check we don't contain any of the msgs we're about to publish
@@ -1934,7 +1954,7 @@ test_msg_store_confirms(MsgIds, Cap, MSCState) ->
passed.
test_msg_store_confirm_timer() ->
- Ref = rabbit_guid:guid(),
+ Ref = rabbit_guid:gen(),
MsgId = msg_id_bin(1),
Self = self(),
MSCState = rabbit_msg_store:client_init(
@@ -1963,7 +1983,7 @@ msg_store_keep_busy_until_confirm(MsgIds, MSCState) ->
test_msg_store_client_delete_and_terminate() ->
restart_msg_store_empty(),
MsgIds = [msg_id_bin(M) || M <- lists:seq(1, 10)],
- Ref = rabbit_guid:guid(),
+ Ref = rabbit_guid:gen(),
MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
ok = msg_store_write(MsgIds, MSCState),
%% test the 'dying client' fast path for writes
@@ -1979,7 +1999,7 @@ test_queue() ->
init_test_queue() ->
TestQueue = test_queue(),
Terms = rabbit_queue_index:shutdown_terms(TestQueue),
- PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()),
+ PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:gen()),
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef),
Res = rabbit_queue_index:recover(
TestQueue, Terms, false,
@@ -2013,7 +2033,7 @@ restart_app() ->
rabbit:start().
queue_index_publish(SeqIds, Persistent, Qi) ->
- Ref = rabbit_guid:guid(),
+ Ref = rabbit_guid:gen(),
MsgStore = case Persistent of
true -> ?PERSISTENT_MSG_STORE;
false -> ?TRANSIENT_MSG_STORE
@@ -2022,7 +2042,7 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
{A, B = [{_SeqId, LastMsgIdWritten} | _]} =
lists:foldl(
fun (SeqId, {QiN, SeqIdsMsgIdsAcc}) ->
- MsgId = rabbit_guid:guid(),
+ MsgId = rabbit_guid:gen(),
QiM = rabbit_queue_index:publish(
MsgId, SeqId, #message_properties{}, Persistent, QiN),
ok = rabbit_msg_store:write(MsgId, MsgId, MSCState),
@@ -2045,7 +2065,7 @@ verify_read_with_published(_Delivered, _Persistent, _Read, _Published) ->
test_queue_index_props() ->
with_empty_test_queue(
fun(Qi0) ->
- MsgId = rabbit_guid:guid(),
+ MsgId = rabbit_guid:gen(),
Props = #message_properties{expiry=12345},
Qi1 = rabbit_queue_index:publish(MsgId, 1, Props, true, Qi0),
{[{MsgId, 1, Props, _, _}], Qi2} =
@@ -2222,14 +2242,26 @@ test_amqqueue(Durable) ->
#amqqueue { durable = Durable }.
with_fresh_variable_queue(Fun) ->
- ok = empty_test_queue(),
- VQ = variable_queue_init(test_amqqueue(true), false),
- S0 = rabbit_variable_queue:status(VQ),
- assert_props(S0, [{q1, 0}, {q2, 0},
- {delta, {delta, undefined, 0, undefined}},
- {q3, 0}, {q4, 0},
- {len, 0}]),
- _ = rabbit_variable_queue:delete_and_terminate(shutdown, Fun(VQ)),
+ Ref = make_ref(),
+ Me = self(),
+ %% Run in a separate process since rabbit_msg_store will send
+ %% bump_credit messages and we want to ignore them
+ spawn_link(fun() ->
+ ok = empty_test_queue(),
+ VQ = variable_queue_init(test_amqqueue(true), false),
+ S0 = rabbit_variable_queue:status(VQ),
+ assert_props(S0, [{q1, 0}, {q2, 0},
+ {delta,
+ {delta, undefined, 0, undefined}},
+ {q3, 0}, {q4, 0},
+ {len, 0}]),
+ _ = rabbit_variable_queue:delete_and_terminate(
+ shutdown, Fun(VQ)),
+ Me ! Ref
+ end),
+ receive
+ Ref -> ok
+ end,
passed.
publish_and_confirm(Q, Payload, Count) ->
diff --git a/src/rabbit_tests_event_receiver.erl b/src/rabbit_tests_event_receiver.erl
index abcbe0b6..72c07b51 100644
--- a/src/rabbit_tests_event_receiver.erl
+++ b/src/rabbit_tests_event_receiver.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_tests_event_receiver).
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index 58079ccf..3a5b96de 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_trace).
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index ae2b5d3f..732c29b6 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_types).
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 717d94a8..80f50b38 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_upgrade).
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index f164035e..9f2535bd 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_upgrade_functions).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 63a0927f..52eb168a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_variable_queue).
@@ -434,7 +434,7 @@ init(#amqqueue { name = QueueName, durable = true }, true,
Terms = rabbit_queue_index:shutdown_terms(QueueName),
{PRef, Terms1} =
case proplists:get_value(persistent_ref, Terms) of
- undefined -> {rabbit_guid:guid(), []};
+ undefined -> {rabbit_guid:gen(), []};
PRef1 -> {PRef1, Terms}
end,
PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
@@ -860,7 +860,8 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
Res.
msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
- msg_store_client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun, Callback).
+ msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun,
+ Callback).
msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
@@ -870,17 +871,23 @@ msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:write(MsgId, Msg, MSCState1) end).
+ fun (MSCState1) ->
+ rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
+ end).
msg_store_read(MSCState, IsPersistent, MsgId) ->
with_msg_store_state(
MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:read(MsgId, MSCState1) end).
+ fun (MSCState1) ->
+ rabbit_msg_store:read(MsgId, MSCState1)
+ end).
msg_store_remove(MSCState, IsPersistent, MsgIds) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end).
+ fun (MCSState1) ->
+ rabbit_msg_store:remove(MsgIds, MCSState1)
+ end).
msg_store_close_fds(MSCState, IsPersistent) ->
with_msg_store_state(
diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl
index f6bcbb7f..7545d813 100644
--- a/src/rabbit_version.erl
+++ b/src/rabbit_version.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_version).
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 38bb76b0..5548ef6d 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_vhost).
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 091b50e4..dc74b2f5 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_writer).
@@ -129,6 +129,9 @@ handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
ok = internal_send_command_async(MethodRecord, Content, State),
rabbit_amqqueue:notify_sent(QPid, ChPid),
State;
+handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
+ rabbit_amqqueue:notify_sent_queue_down(QPid),
+ State;
handle_message({inet_reply, _, ok}, State) ->
State;
handle_message({inet_reply, _, Status}, _State) ->
@@ -169,12 +172,10 @@ call(Pid, Msg) ->
%%---------------------------------------------------------------------------
assemble_frame(Channel, MethodRecord, Protocol) ->
- ?LOGMESSAGE(out, Channel, MethodRecord, none),
rabbit_binary_generator:build_simple_method_frame(
Channel, MethodRecord, Protocol).
assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
- ?LOGMESSAGE(out, Channel, MethodRecord, Content),
MethodName = rabbit_misc:method_record_type(MethodRecord),
true = Protocol:method_has_content(MethodName), % assertion
MethodFrame = rabbit_binary_generator:build_simple_method_frame(
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 26ea502c..a2f4fae9 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -41,7 +41,7 @@
%% 5) normal, and {shutdown, _} exit reasons are all treated the same
%% (i.e. are regarded as normal exits)
%%
-%% All modifications are (C) 2010-2011 VMware, Inc.
+%% All modifications are (C) 2010-2012 VMware, Inc.
%%
%% %CopyrightBegin%
%%
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index 8678c2c9..43a6bc99 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_acceptor).
@@ -54,28 +54,9 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
{ok, Mod} = inet_db:lookup_socket(LSock),
inet_db:register_socket(Sock, Mod),
- try
- %% report
- {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
- {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end),
- error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
- [rabbit_misc:ntoab(Address), Port,
- rabbit_misc:ntoab(PeerAddress), PeerPort]),
- %% In the event that somebody floods us with connections we can spew
- %% the above message at error_logger faster than it can keep up.
- %% So error_logger's mailbox grows unbounded until we eat all the
- %% memory available and crash. So here's a meaningless synchronous call
- %% to the underlying gen_event mechanism - when it returns the mailbox
- %% is drained.
- gen_event:which_handlers(error_logger),
- %% handle
- file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
- ok = file_handle_cache:obtain()
- catch {inet_error, Reason} ->
- gen_tcp:close(Sock),
- error_logger:error_msg("unable to accept TCP connection: ~p~n",
- [Reason])
- end,
+ %% handle
+ file_handle_cache:transfer(apply(M, F, A ++ [Sock])),
+ ok = file_handle_cache:obtain(),
%% accept more
accept(State);
@@ -88,9 +69,12 @@ handle_info({inet_async, LSock, Ref, {error, closed}},
handle_info({inet_async, LSock, Ref, {error, Reason}},
State=#state{sock=LSock, ref=Ref}) ->
- {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
+ {AddressS, Port} = case inet:sockname(LSock) of
+ {ok, {A, P}} -> {rabbit_misc:ntoab(A), P};
+ {error, _} -> {"unknown", unknown}
+ end,
error_logger:error_msg("failed to accept TCP connection on ~s:~p: ~p~n",
- [rabbit_misc:ntoab(Address), Port, Reason]),
+ [AddressS, Port, Reason]),
accept(State);
handle_info(_Info, State) ->
@@ -104,8 +88,6 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
-inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
-
accept(State = #state{sock=LSock}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} -> {noreply, State#state{ref=Ref}};
diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl
index cb3dd02c..d8844441 100644
--- a/src/tcp_acceptor_sup.erl
+++ b/src/tcp_acceptor_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_acceptor_sup).
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index 9a82ac88..fb01c792 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_listener).
@@ -72,8 +72,9 @@ init({IPAddress, Port, SocketOpts,
label = Label}};
{error, Reason} ->
error_logger:error_msg(
- "failed to start ~s on ~s:~p - ~p~n",
- [Label, rabbit_misc:ntoab(IPAddress), Port, Reason]),
+ "failed to start ~s on ~s:~p - ~p (~s)~n",
+ [Label, rabbit_misc:ntoab(IPAddress), Port,
+ Reason, inet:format_error(Reason)]),
{stop, {cannot_listen, IPAddress, Port, Reason}}
end.
diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl
index 74297b6d..9ee921b4 100644
--- a/src/tcp_listener_sup.erl
+++ b/src/tcp_listener_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(tcp_listener_sup).
diff --git a/src/test_sup.erl b/src/test_sup.erl
index 5feb146f..7f4b5049 100644
--- a/src/test_sup.erl
+++ b/src/test_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(test_sup).
diff --git a/src/vm_memory_monitor.erl b/src/vm_memory_monitor.erl
index 8973a4f7..fca55f02 100644
--- a/src/vm_memory_monitor.erl
+++ b/src/vm_memory_monitor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% In practice Erlang shouldn't be allowed to grow to more than a half
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index fcb07a16..c9ecccd6 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(worker_pool).
diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl
index d37c3a0f..ff356366 100644
--- a/src/worker_pool_sup.erl
+++ b/src/worker_pool_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(worker_pool_sup).
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index b42530e2..1ddcebb2 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(worker_pool_worker).