summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-06-11 14:47:58 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-06-11 14:47:58 +0100
commit896d1dd084942ce6714b0628088c9a6761b8f017 (patch)
treedd840ffe46654cfa16746eabc3caf513fccd74d7
parent4204e3cbc8067062fb54885239a85e1e9af7c852 (diff)
parent794d53c6b884aa0de80d2a910b79992071fbe0bf (diff)
downloadrabbitmq-server-896d1dd084942ce6714b0628088c9a6761b8f017.tar.gz
Merging bug 18524 onto default
-rw-r--r--Makefile22
-rw-r--r--include/rabbit_backing_queue_spec.hrl6
-rw-r--r--include/rabbit_exchange_type_spec.hrl7
-rwxr-xr-xpackaging/common/rabbitmq-server.ocf172
-rwxr-xr-xscripts/rabbitmq-multi3
-rw-r--r--scripts/rabbitmq-multi.bat6
-rwxr-xr-xscripts/rabbitmq-server3
-rw-r--r--scripts/rabbitmq-server.bat6
-rw-r--r--scripts/rabbitmq-service.bat6
-rwxr-xr-xscripts/rabbitmqctl3
-rw-r--r--scripts/rabbitmqctl.bat6
-rw-r--r--src/rabbit_alarm.erl11
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_amqqueue_process.erl25
-rw-r--r--src/rabbit_backing_queue.erl6
-rw-r--r--src/rabbit_channel.erl221
-rw-r--r--src/rabbit_exchange.erl88
-rw-r--r--src/rabbit_exchange_type.erl12
-rw-r--r--src/rabbit_exchange_type_direct.erl10
-rw-r--r--src/rabbit_exchange_type_fanout.erl10
-rw-r--r--src/rabbit_exchange_type_headers.erl10
-rw-r--r--src/rabbit_exchange_type_registry.erl6
-rw-r--r--src/rabbit_exchange_type_topic.erl10
-rw-r--r--src/rabbit_tests.erl128
24 files changed, 527 insertions, 258 deletions
diff --git a/Makefile b/Makefile
index 99e0e50f..725f20a6 100644
--- a/Makefile
+++ b/Makefile
@@ -40,7 +40,7 @@ BASIC_PLT=basic.plt
RABBIT_PLT=rabbit.plt
ifndef USE_SPECS
-# our type specs rely on features / bug fixes in dialyzer that are
+# our type specs rely on features and bug fixes in dialyzer that are
# only available in R13B01 upwards (R13B01 is eshell 5.7.2)
#
# NB: the test assumes that version number will only contain single digits
@@ -70,6 +70,12 @@ define usage_dep
$(call usage_xml_to_erl, $(1)): $(1) $(DOCS_DIR)/usage.xsl
endef
+ifneq "$(SBIN_DIR)" ""
+ifneq "$(TARGET_DIR)" ""
+SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR))
+endif
+endif
+
all: $(TARGETS)
$(DEPS_FILE): $(SOURCES) $(INCLUDES)
@@ -237,13 +243,7 @@ $(SOURCE_DIR)/%_usage.erl:
docs_all: $(MANPAGES) $(WEB_MANPAGES)
-install: SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR))
install: all docs_all install_dirs
- @[ -n "$(TARGET_DIR)" ] || (echo "Please set TARGET_DIR."; false)
- @[ -n "$(SBIN_DIR)" ] || (echo "Please set SBIN_DIR."; false)
- @[ -n "$(MAN_DIR)" ] || (echo "Please set MAN_DIR."; false)
-
- mkdir -p $(TARGET_DIR)
cp -r ebin include LICENSE LICENSE-MPL-RabbitMQ INSTALL $(TARGET_DIR)
chmod 0755 scripts/*
@@ -259,8 +259,14 @@ install: all docs_all install_dirs
done
install_dirs:
- mkdir -p $(SBIN_DIR)
+ @ OK=true && \
+ { [ -n "$(TARGET_DIR)" ] || { echo "Please set TARGET_DIR."; OK=false; }; } && \
+ { [ -n "$(SBIN_DIR)" ] || { echo "Please set SBIN_DIR."; OK=false; }; } && \
+ { [ -n "$(MAN_DIR)" ] || { echo "Please set MAN_DIR."; OK=false; }; } && $$OK
+
mkdir -p $(TARGET_DIR)/sbin
+ mkdir -p $(SBIN_DIR)
+ mkdir -p $(MAN_DIR)
$(foreach XML, $(USAGES_XML), $(eval $(call usage_dep, $(XML))))
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index 1b536dfa..55cd126e 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index 9864f1eb..cb564365 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -38,5 +38,6 @@
-spec(delete/2 :: (exchange(), list(binding())) -> 'ok').
-spec(add_binding/2 :: (exchange(), binding()) -> 'ok').
-spec(remove_bindings/2 :: (exchange(), list(binding())) -> 'ok').
+-spec(assert_args_equivalence/2 :: (exchange(), amqp_table()) -> 'ok').
-endif.
diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf
index 97c58ea2..db0ed70b 100755
--- a/packaging/common/rabbitmq-server.ocf
+++ b/packaging/common/rabbitmq-server.ocf
@@ -35,21 +35,22 @@
##
## OCF instance parameters
-## OCF_RESKEY_multi
-## OCF_RESKEY_ctl
-## OCF_RESKEY_nodename
-## OCF_RESKEY_ip
-## OCF_RESKEY_port
-## OCF_RESKEY_cluster_config_file
-## OCF_RESKEY_config_file
-## OCF_RESKEY_log_base
-## OCF_RESKEY_mnesia_base
-## OCF_RESKEY_server_start_args
+## OCF_RESKEY_multi
+## OCF_RESKEY_ctl
+## OCF_RESKEY_nodename
+## OCF_RESKEY_ip
+## OCF_RESKEY_port
+## OCF_RESKEY_cluster_config_file
+## OCF_RESKEY_config_file
+## OCF_RESKEY_log_base
+## OCF_RESKEY_mnesia_base
+## OCF_RESKEY_server_start_args
#######################################################################
# Initialization:
-. ${OCF_ROOT}/resource.d/heartbeat/.ocf-shellfuncs
+: ${OCF_FUNCTIONS_DIR=${OCF_ROOT}/resource.d/heartbeat}
+. ${OCF_FUNCTIONS_DIR}/.ocf-shellfuncs
#######################################################################
@@ -63,7 +64,7 @@ OCF_RESKEY_log_base_default="/var/log/rabbitmq"
: ${OCF_RESKEY_log_base=${OCF_RESKEY_log_base_default}}
meta_data() {
- cat <<END
+ cat <<END
<?xml version="1.0"?>
<!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd">
<resource-agent name="rabbitmq-server">
@@ -113,7 +114,7 @@ The IP address for rabbitmq-server to listen on
The IP Port for rabbitmq-server to listen on
</longdesc>
<shortdesc lang="en">IP Port</shortdesc>
-<content type="string" default="" />
+<content type="integer" default="" />
</parameter>
<parameter name="cluster_config_file" unique="0" required="0">
@@ -161,7 +162,8 @@ Additional arguments provided to the server on startup
<actions>
<action name="start" timeout="600" />
<action name="stop" timeout="120" />
-<action name="monitor" timeout="20" interval="10" depth="0" start-delay="0" />
+<action name="status" timeout="20" interval="10" />
+<action name="monitor" timeout="20" interval="10" />
<action name="validate-all" timeout="30" />
<action name="meta-data" timeout="5" />
</actions>
@@ -170,8 +172,8 @@ END
}
rabbit_usage() {
- cat <<END
-usage: $0 {start|stop|monitor|validate-all|meta-data}
+ cat <<END
+usage: $0 {start|stop|status|monitor|validate-all|meta-data}
Expects to have a fully populated OCF RA-compliant environment set.
END
@@ -202,35 +204,35 @@ export_vars() {
rabbit_validate_partial() {
if [ ! -x $RABBITMQ_MULTI ]; then
- ocf_log err "rabbitmq-server multi $RABBITMQ_MULTI does not exist or is not executable";
- return $OCF_ERR_ARGS;
+ ocf_log err "rabbitmq-server multi $RABBITMQ_MULTI does not exist or is not executable";
+ exit $OCF_ERR_INSTALLED;
fi
if [ ! -x $RABBITMQ_CTL ]; then
- ocf_log err "rabbitmq-server ctl $RABBITMQ_CTL does not exist or is not executable";
- return $OCF_ERR_ARGS;
+ ocf_log err "rabbitmq-server ctl $RABBITMQ_CTL does not exist or is not executable";
+ exit $OCF_ERR_INSTALLED;
fi
}
rabbit_validate_full() {
if [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && [ ! -e $RABBITMQ_CLUSTER_CONFIG_FILE ]; then
- ocf_log err "rabbitmq-server cluster_config_file $RABBITMQ_CLUSTER_CONFIG_FILE does not exist or is not a file";
- return $OCF_ERR_ARGS;
+ ocf_log err "rabbitmq-server cluster_config_file $RABBITMQ_CLUSTER_CONFIG_FILE does not exist or is not a file";
+ exit $OCF_ERR_INSTALLED;
fi
if [ ! -z $RABBITMQ_CONFIG_FILE ] && [ ! -e $RABBITMQ_CONFIG_FILE ]; then
- ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file";
- return $OCF_ERR_ARGS;
+ ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file";
+ exit $OCF_ERR_INSTALLED;
fi
if [ ! -z $RABBITMQ_LOG_BASE ] && [ ! -d $RABBITMQ_LOG_BASE ]; then
- ocf_log err "rabbitmq-server log_base $RABBITMQ_LOG_BASE does not exist or is not a directory";
- return $OCF_ERR_ARGS;
+ ocf_log err "rabbitmq-server log_base $RABBITMQ_LOG_BASE does not exist or is not a directory";
+ exit $OCF_ERR_INSTALLED;
fi
if [ ! -z $RABBITMQ_MNESIA_BASE ] && [ ! -d $RABBITMQ_MNESIA_BASE ]; then
- ocf_log err "rabbitmq-server mnesia_base $RABBITMQ_MNESIA_BASE does not exist or is not a directory";
- return $OCF_ERR_ARGS;
+ ocf_log err "rabbitmq-server mnesia_base $RABBITMQ_MNESIA_BASE does not exist or is not a directory";
+ exit $OCF_ERR_INSTALLED;
fi
rabbit_validate_partial
@@ -243,25 +245,26 @@ rabbit_status() {
$RABBITMQ_CTL $NODENAME_ARG status > /dev/null 2> /dev/null
rc=$?
case "$rc" in
- 0)
- return $OCF_SUCCESS
- ;;
- 2)
- return $OCF_NOT_RUNNING
- ;;
- *)
- ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG status: $rc"
- return $OCF_ERR_GENERIC
+ 0)
+ ocf_log debug "RabbitMQ server is running normally"
+ return $OCF_SUCCESS
+ ;;
+ 2)
+ ocf_log debug "RabbitMQ server is not running"
+ return $OCF_NOT_RUNNING
+ ;;
+ *)
+ ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG status: $rc"
+ exit $OCF_ERR_GENERIC
esac
}
rabbit_start() {
local rc
- rabbit_validate_full
- rc=$?
- if [ "$rc" != $OCF_SUCCESS ]; then
- return $rc
+ if rabbit_status; then
+ ocf_log info "Resource already running."
+ return $OCF_SUCCESS
fi
export_vars
@@ -270,24 +273,23 @@ rabbit_start() {
rc=$?
if [ "$rc" != 0 ]; then
- ocf_log err "rabbitmq-server start command failed: $RABBITMQ_MULTI start_all 1, $rc"
- return $rc
+ ocf_log err "rabbitmq-server start command failed: $RABBITMQ_MULTI start_all 1, $rc"
+ return $rc
fi
# Spin waiting for the server to come up.
# Let the CRM/LRM time us out if required
start_wait=1
while [ $start_wait = 1 ]; do
- rabbit_status
- rc=$?
- if [ "$rc" = $OCF_SUCCESS ]; then
- start_wait=0
-
- elif [ "$rc" != $OCF_NOT_RUNNING ]; then
- ocf_log info "rabbitmq-server start failed: $rc"
- return $OCF_ERR_GENERIC
- fi
- sleep 2
+ rabbit_status
+ rc=$?
+ if [ "$rc" = $OCF_SUCCESS ]; then
+ start_wait=0
+ elif [ "$rc" != $OCF_NOT_RUNNING ]; then
+ ocf_log info "rabbitmq-server start failed: $rc"
+ exit $OCF_ERR_GENERIC
+ fi
+ sleep 1
done
return $OCF_SUCCESS
@@ -295,28 +297,34 @@ rabbit_start() {
rabbit_stop() {
local rc
+
+ if ! rabbit_status; then
+ ocf_log info "Resource not running."
+ return $OCF_SUCCESS
+ fi
+
$RABBITMQ_MULTI stop_all &
rc=$?
if [ "$rc" != 0 ]; then
- ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_MULTI stop_all, $rc"
- return $rc
+ ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_MULTI stop_all, $rc"
+ return $rc
fi
# Spin waiting for the server to shut down.
# Let the CRM/LRM time us out if required
stop_wait=1
while [ $stop_wait = 1 ]; do
- rabbit_status
- rc=$?
- if [ "$rc" = $OCF_NOT_RUNNING ]; then
- stop_wait=0
+ rabbit_status
+ rc=$?
+ if [ "$rc" = $OCF_NOT_RUNNING ]; then
+ stop_wait=0
break
- elif [ "$rc" != $OCF_SUCCESS ]; then
- ocf_log info "rabbitmq-server stop failed: $rc"
- return $OCF_ERR_GENERIC
- fi
- sleep 2
+ elif [ "$rc" != $OCF_SUCCESS ]; then
+ ocf_log info "rabbitmq-server stop failed: $rc"
+ exit $OCF_ERR_GENERIC
+ fi
+ sleep 1
done
return $OCF_SUCCESS
@@ -329,34 +337,38 @@ rabbit_monitor() {
case $__OCF_ACTION in
meta-data)
- meta_data
- exit $OCF_SUCCESS
- ;;
+ meta_data
+ exit $OCF_SUCCESS
+ ;;
usage|help)
- rabbit_usage
- exit $OCF_SUCCESS
- ;;
+ rabbit_usage
+ exit $OCF_SUCCESS
+ ;;
esac
-rabbit_validate_partial || exit
+if ocf_is_probe; then
+ rabbit_validate_partial
+else
+ rabbit_validate_full
+fi
case $__OCF_ACTION in
start)
- rabbit_start
+ rabbit_start
;;
stop)
- rabbit_stop
+ rabbit_stop
;;
- monitor)
- rabbit_monitor
+ status|monitor)
+ rabbit_monitor
;;
validate-all)
exit $OCF_SUCCESS
- ;;
+ ;;
*)
- rabbit_usage
- exit $OCF_ERR_UNIMPLEMENTED
- ;;
+ rabbit_usage
+ exit $OCF_ERR_UNIMPLEMENTED
+ ;;
esac
-exit $? \ No newline at end of file
+exit $?
diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi
index 8341d35c..59050692 100755
--- a/scripts/rabbitmq-multi
+++ b/scripts/rabbitmq-multi
@@ -29,7 +29,8 @@
##
## Contributor(s): ______________________________________.
##
-NODENAME=rabbit
+[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
+NODENAME=rabbit@${HOSTNAME%%.*}
SCRIPT_HOME=$(dirname $0)
PIDS_FILE=/var/lib/rabbitmq/pids
MULTI_ERL_ARGS=
diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat
index a4b7f2e9..a4f8c8b4 100644
--- a/scripts/rabbitmq-multi.bat
+++ b/scripts/rabbitmq-multi.bat
@@ -42,8 +42,12 @@ if "!RABBITMQ_BASE!"=="" (
set RABBITMQ_BASE=!APPDATA!\RabbitMQ
)
+if "!COMPUTERNAME!"=="" (
+ set COMPUTERNAME=localhost
+)
+
if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index ccdfc401..2261b56e 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -30,7 +30,8 @@
## Contributor(s): ______________________________________.
##
-NODENAME=rabbit
+[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
+NODENAME=rabbit@${HOSTNAME%%.*}
SERVER_ERL_ARGS="+K true +A30 +P 1048576 \
-kernel inet_default_listen_options [{nodelay,true}] \
-kernel inet_default_connect_options [{nodelay,true}]"
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 57fe1328..a290f935 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -42,8 +42,12 @@ if "!RABBITMQ_BASE!"=="" (
set RABBITMQ_BASE=!APPDATA!\RabbitMQ
)
+if "!COMPUTERNAME!"=="" (
+ set COMPUTERNAME=localhost
+)
+
if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index a4021fd6..bd117b83 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -48,8 +48,12 @@ if "!RABBITMQ_BASE!"=="" (
set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
+if "!COMPUTERNAME!"=="" (
+ set COMPUTERNAME=localhost
+)
+
if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index cfb775eb..92e5312b 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -30,7 +30,8 @@
## Contributor(s): ______________________________________.
##
-NODENAME=rabbit
+[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
+NODENAME=rabbit@${HOSTNAME%%.*}
. `dirname $0`/rabbitmq-env
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index 55572451..563b9e58 100644
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -38,8 +38,12 @@ set TDP0=%~dp0
set STAR=%*
setlocal enabledelayedexpansion
+if "!COMPUTERNAME!"=="" (
+ set COMPUTERNAME=localhost
+)
+
if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
if not exist "!ERLANG_HOME!\bin\erl.exe" (
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 7e96d9a3..53c713e6 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -47,7 +47,7 @@
-type(mfa_tuple() :: {atom(), atom(), list()}).
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(register/2 :: (pid(), mfa_tuple()) -> 'ok').
+-spec(register/2 :: (pid(), mfa_tuple()) -> boolean()).
-endif.
@@ -67,9 +67,9 @@ stop() ->
ok = alarm_handler:delete_alarm_handler(?MODULE).
register(Pid, HighMemMFA) ->
- ok = gen_event:call(alarm_handler, ?MODULE,
- {register, Pid, HighMemMFA},
- infinity).
+ gen_event:call(alarm_handler, ?MODULE,
+ {register, Pid, HighMemMFA},
+ infinity).
%%----------------------------------------------------------------------------
@@ -84,7 +84,8 @@ handle_call({register, Pid, {M, F, A} = HighMemMFA},
false -> ok
end,
NewAlertees = dict:store(Pid, HighMemMFA, Alertess),
- {ok, ok, State#alarms{alertees = NewAlertees}};
+ {ok, State#alarms.vm_memory_high_watermark,
+ State#alarms{alertees = NewAlertees}};
handle_call(_Request, State) ->
{ok, not_understood, State}.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1756640a..3c9c41bd 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -195,7 +195,8 @@ start_queue_process(Q) ->
add_default_binding(#amqqueue{name = QueueName}) ->
Exchange = rabbit_misc:r(QueueName, exchange, <<>>),
RoutingKey = QueueName#resource.name,
- rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, []),
+ rabbit_exchange:add_binding(Exchange, QueueName, RoutingKey, [],
+ fun (_X, _Q) -> ok end),
ok.
lookup(Name) ->
@@ -267,7 +268,7 @@ deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) ->
true.
requeue(QPid, MsgIds, ChPid) ->
- delegate_cast(QPid, {requeue, MsgIds, ChPid}).
+ delegate_call(QPid, {requeue, MsgIds, ChPid}, infinity).
ack(QPid, Txn, MsgIds, ChPid) ->
delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
@@ -397,9 +398,6 @@ delegate_pcall(Pid, Pri, Msg, Timeout) ->
delegate:invoke(Pid,
fun (P) -> gen_server2:pcall(P, Pri, Msg, Timeout) end).
-delegate_cast(Pid, Msg) ->
- delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end).
-
delegate_pcast(Pid, Pri, Msg) ->
delegate:invoke_no_result(Pid,
fun (P) -> gen_server2:pcast(P, Pri, Msg) end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3283cb66..5fdf0ffa 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -716,6 +716,19 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
{Count, BQS1} = BQ:purge(BQS),
reply({ok, Count}, State#q{backing_queue_state = BQS1});
+handle_call({requeue, AckTags, ChPid}, From, State) ->
+ gen_server2:reply(From, ok),
+ case lookup_ch(ChPid) of
+ not_found ->
+ rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
+ [ChPid]),
+ noreply(State);
+ C = #cr{acktags = ChAckTags} ->
+ ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ store_ch_record(C#cr{acktags = ChAckTags1}),
+ noreply(requeue_and_run(AckTags, State))
+ end;
+
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
@@ -743,18 +756,6 @@ handle_cast({ack, Txn, AckTags, ChPid},
handle_cast({rollback, Txn, ChPid}, State) ->
noreply(rollback_transaction(Txn, ChPid, State));
-handle_cast({requeue, AckTags, ChPid}, State) ->
- case lookup_ch(ChPid) of
- not_found ->
- rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
- [ChPid]),
- noreply(State);
- C = #cr{acktags = ChAckTags} ->
- ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- store_ch_record(C#cr{acktags = ChAckTags1}),
- noreply(requeue_and_run(AckTags, State))
- end;
-
handle_cast({unblock, ChPid}, State) ->
noreply(
possibly_unblock(State, ChPid,
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 2dba00ad..432d6290 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7588c504..76a500ce 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -39,6 +39,8 @@
-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
+-export([flow_timeout/2]).
+
-export([init/1, terminate/2, code_change/3,
handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]).
@@ -46,9 +48,12 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, queue_collector_pid}).
+ consumer_mapping, blocking, queue_collector_pid, flow}).
+
+-record(flow, {server, client, pending}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
+-define(FLOW_OK_TIMEOUT, 10000). %% 10 seconds
-define(INFO_KEYS,
[pid,
@@ -66,6 +71,8 @@
-ifdef(use_specs).
+-type(ref() :: any()).
+
-spec(start_link/6 ::
(channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()).
-spec(do/2 :: (pid(), amqp_method_record()) -> 'ok').
@@ -75,6 +82,7 @@
-spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok').
-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
+-spec(flow_timeout/2 :: (pid(), ref()) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> [info_key()]).
-spec(info/1 :: (pid()) -> [info()]).
@@ -113,6 +121,9 @@ conserve_memory(Pid, Conserve) ->
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
+flow_timeout(Pid, Ref) ->
+ gen_server2:pcast(Pid, 7, {flow_timeout, Ref}).
+
list() ->
pg_local:get_members(rabbit_channels).
@@ -154,7 +165,9 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
most_recently_declared_queue = <<>>,
consumer_mapping = dict:new(),
blocking = dict:new(),
- queue_collector_pid = CollectorPid},
+ queue_collector_pid = CollectorPid,
+ flow = #flow{server = true, client = true,
+ pending = none}},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -181,11 +194,9 @@ handle_cast({method, Method, Content}, State) ->
{stop, normal, State#ch{state = terminating}}
catch
exit:Reason = #amqp_error{} ->
- ok = rollback_and_notify(State),
MethodName = rabbit_misc:method_record_type(Method),
- State#ch.reader_pid ! {channel_exit, State#ch.channel,
- Reason#amqp_error{method = MethodName}},
- {stop, normal, State#ch{state = terminating}};
+ {stop, normal, terminating(Reason#amqp_error{method = MethodName},
+ State)};
exit:normal ->
{stop, normal, State};
_:Reason ->
@@ -209,11 +220,25 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_cast({conserve_memory, Conserve}, State) ->
- ok = clear_permission_cache(),
- ok = rabbit_writer:send_command(
- State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
- noreply(State).
+handle_cast({conserve_memory, true}, State = #ch{state = starting}) ->
+ noreply(State);
+handle_cast({conserve_memory, false}, State = #ch{state = starting}) ->
+ ok = rabbit_writer:send_command(State#ch.writer_pid, #'channel.open_ok'{}),
+ noreply(State#ch{state = running});
+handle_cast({conserve_memory, Conserve}, State = #ch{state = running}) ->
+ flow_control(not Conserve, State);
+handle_cast({conserve_memory, _Conserve}, State) ->
+ noreply(State);
+
+handle_cast({flow_timeout, Ref},
+ State = #ch{flow = #flow{client = Flow, pending = {Ref, _TRef}}}) ->
+ {stop, normal, terminating(
+ rabbit_misc:amqp_error(
+ precondition_failed,
+ "timeout waiting for channel.flow_ok{active=~w}",
+ [not Flow], none), State)};
+handle_cast({flow_timeout, _Ref}, State) ->
+ {noreply, State}.
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State = #ch{writer_pid = WriterPid}) ->
@@ -254,6 +279,11 @@ return_ok(State, false, Msg) -> {reply, Msg, State}.
ok_msg(true, _Msg) -> undefined;
ok_msg(false, Msg) -> Msg.
+terminating(Reason, State = #ch{channel = Channel, reader_pid = Reader}) ->
+ ok = rollback_and_notify(State),
+ Reader ! {channel_exit, Channel, Reason},
+ State#ch{state = terminating}.
+
return_queue_declare_ok(State, NoWait, Q) ->
NewState = State#ch{most_recently_declared_queue =
(Q#amqqueue.name)#resource.name},
@@ -299,21 +329,18 @@ check_write_permitted(Resource, #ch{ username = Username}) ->
check_read_permitted(Resource, #ch{ username = Username}) ->
check_resource_access(Username, Resource, read).
+check_exclusive_access(#amqqueue{exclusive_owner = Owner}, Owner, _MatchType) ->
+ ok;
+check_exclusive_access(#amqqueue{exclusive_owner = none}, _ReaderPid, lax) ->
+ ok;
+check_exclusive_access(#amqqueue{name = QName}, _ReaderPid, _MatchType) ->
+ rabbit_misc:protocol_error(
+ resource_locked,
+ "cannot obtain exclusive access to locked ~s", [rabbit_misc:rs(QName)]).
+
with_exclusive_access_or_die(QName, ReaderPid, F) ->
- case rabbit_amqqueue:with_or_die(
- QName, fun (Q = #amqqueue{exclusive_owner = Owner})
- when Owner =:= none orelse Owner =:= ReaderPid ->
- F(Q);
- (_) ->
- {error, wrong_exclusive_owner}
- end) of
- {error, wrong_exclusive_owner} ->
- rabbit_misc:protocol_error(
- resource_locked, "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(QName)]);
- Other ->
- Other
- end.
+ rabbit_amqqueue:with_or_die(
+ QName, fun (Q) -> check_exclusive_access(Q, ReaderPid, lax), F(Q) end).
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
@@ -369,8 +396,10 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
- {reply, #'channel.open_ok'{}, State#ch{state = running}};
+ case rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}) of
+ true -> {noreply, State};
+ false -> {reply, #'channel.open_ok'{}, State#ch{state = running}}
+ end;
handle_method(#'channel.open'{}, _, _State) ->
rabbit_misc:protocol_error(
@@ -387,13 +416,17 @@ handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
-handle_method(#'basic.publish'{exchange = ExchangeNameBin,
+handle_method(#'basic.publish'{}, _, #ch{flow = #flow{client = false}}) ->
+ rabbit_misc:protocol_error(
+ command_invalid,
+ "basic.publish received after channel.flow_ok{active=false}", []);
+handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
- mandatory = Mandatory,
- immediate = Immediate},
- Content, State = #ch{ virtual_host = VHostPath,
- transaction_id = TxnKey,
- writer_pid = WriterPid}) ->
+ mandatory = Mandatory,
+ immediate = Immediate},
+ Content, State = #ch{virtual_host = VHostPath,
+ transaction_id = TxnKey,
+ writer_pid = WriterPid}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -430,13 +463,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
_, State = #ch{transaction_id = TxnKey,
- next_tag = NextDeliveryTag,
unacked_message_q = UAMQ}) ->
- if DeliveryTag >= NextDeliveryTag ->
- rabbit_misc:protocol_error(
- command_invalid, "unknown delivery tag ~w", [DeliveryTag]);
- true -> ok
- end,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
Participants = ack(TxnKey, Acked),
{noreply, case TxnKey of
@@ -453,11 +480,12 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
_, State = #ch{ writer_pid = WriterPid,
+ reader_pid = ReaderPid,
next_tag = DeliveryTag }) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
- case rabbit_amqqueue:with_or_die(
- QueueName,
+ case with_exclusive_access_or_die(
+ QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
Msg = {_QName, _QPid, _MsgId, Redelivered,
@@ -653,18 +681,17 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
AutoDelete,
Args)
end,
- ok = rabbit_exchange:assert_type(X, CheckedType),
+ ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable,
+ AutoDelete, Args),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
- type = TypeNameBin,
passive = true,
nowait = NoWait},
_, State = #ch{ virtual_host = VHostPath }) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_configure_permitted(ExchangeName, State),
- X = rabbit_exchange:lookup_or_die(ExchangeName),
- ok = rabbit_exchange:assert_type(X, rabbit_exchange:check_type(TypeNameBin)),
+ _ = rabbit_exchange:lookup_or_die(ExchangeName),
return_ok(State, NoWait, #'exchange.declare_ok'{});
handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
@@ -699,25 +726,28 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
end,
%% We use this in both branches, because queue_declare may yet return an
%% existing queue.
- Finish =
- fun (#amqqueue{name = QueueName, exclusive_owner = Owner1} = Q)
- when Owner =:= Owner1 ->
- check_configure_permitted(QueueName, State),
- %% We need to notify the reader within the channel
- %% process so that we can be sure there are no
- %% outstanding exclusive queues being declared as the
- %% connection shuts down.
- case Owner of
- none -> ok;
- _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
- end,
- Q;
- (#amqqueue{name = QueueName}) ->
- rabbit_misc:protocol_error(
- resource_locked,
- "cannot obtain exclusive access to locked ~s",
- [rabbit_misc:rs(QueueName)])
- end,
+ Finish = fun (#amqqueue{name = QueueName,
+ durable = Durable1,
+ auto_delete = AutoDelete1} = Q)
+ when Durable =:= Durable1, AutoDelete =:= AutoDelete1 ->
+ check_exclusive_access(Q, Owner, strict),
+ check_configure_permitted(QueueName, State),
+ %% We need to notify the reader within the channel
+ %% process so that we can be sure there are no
+ %% outstanding exclusive queues being declared as the
+ %% connection shuts down.
+ case Owner of
+ none -> ok;
+ _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
+ end,
+ Q;
+ %% non-equivalence trumps exclusivity arbitrarily
+ (#amqqueue{name = QueueName}) ->
+ rabbit_misc:protocol_error(
+ precondition_failed,
+ "parameters for ~s not equivalent",
+ [rabbit_misc:rs(QueueName)])
+ end,
Q = case rabbit_amqqueue:with(
rabbit_misc:r(VHostPath, queue, QueueNameBin),
Finish) of
@@ -771,7 +801,7 @@ handle_method(#'queue.bind'{queue = QueueNameBin,
routing_key = RoutingKey,
nowait = NoWait,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:add_binding/4, ExchangeNameBin,
+ binding_action(fun rabbit_exchange:add_binding/5, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.bind_ok'{},
NoWait, State);
@@ -779,7 +809,7 @@ handle_method(#'queue.unbind'{queue = QueueNameBin,
exchange = ExchangeNameBin,
routing_key = RoutingKey,
arguments = Arguments}, _, State) ->
- binding_action(fun rabbit_exchange:delete_binding/4, ExchangeNameBin,
+ binding_action(fun rabbit_exchange:delete_binding/5, ExchangeNameBin,
QueueNameBin, RoutingKey, Arguments, #'queue.unbind_ok'{},
false, State);
@@ -802,14 +832,14 @@ handle_method(#'tx.select'{}, _, State) ->
handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) ->
rabbit_misc:protocol_error(
- not_allowed, "channel is not transactional", []);
+ precondition_failed, "channel is not transactional", []);
handle_method(#'tx.commit'{}, _, State) ->
{reply, #'tx.commit_ok'{}, internal_commit(State)};
handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) ->
rabbit_misc:protocol_error(
- not_allowed, "channel is not transactional", []);
+ precondition_failed, "channel is not transactional", []);
handle_method(#'tx.rollback'{}, _, State) ->
{reply, #'tx.rollback_ok'{}, internal_rollback(State)};
@@ -822,7 +852,6 @@ handle_method(#'channel.flow'{active = true}, _,
end,
{reply, #'channel.flow_ok'{active = true},
State#ch{limiter_pid = LimiterPid1}};
-
handle_method(#'channel.flow'{active = false}, _,
State = #ch{limiter_pid = LimiterPid,
consumer_mapping = Consumers}) ->
@@ -840,11 +869,25 @@ handle_method(#'channel.flow'{active = false}, _,
blocking = dict:from_list(Queues)}}
end;
-handle_method(#'channel.flow_ok'{active = _}, _, State) ->
- %% TODO: We may want to correlate this to channel.flow messages we
- %% have sent, and complain if we get an unsolicited
- %% channel.flow_ok, or the client refuses our flow request.
- {noreply, State};
+handle_method(#'channel.flow_ok'{active = Active}, _,
+ State = #ch{flow = #flow{server = Active, client = Flow,
+ pending = {_Ref, TRef}} = F})
+ when Flow =:= not Active ->
+ {ok, cancel} = timer:cancel(TRef),
+ {noreply, State#ch{flow = F#flow{client = Active, pending = none}}};
+handle_method(#'channel.flow_ok'{active = Active}, _,
+ State = #ch{flow = #flow{server = Flow, client = Flow,
+ pending = {_Ref, TRef}}})
+ when Flow =:= not Active ->
+ {ok, cancel} = timer:cancel(TRef),
+ {noreply, issue_flow(Flow, State)};
+handle_method(#'channel.flow_ok'{}, _, #ch{flow = #flow{pending = none}}) ->
+ rabbit_misc:protocol_error(
+ command_invalid, "unsolicited channel.flow_ok", []);
+handle_method(#'channel.flow_ok'{active = Active}, _, _State) ->
+ rabbit_misc:protocol_error(
+ command_invalid,
+ "received channel.flow_ok{active=~w} has incorrect polarity", [Active]);
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
@@ -852,8 +895,26 @@ handle_method(_MethodRecord, _Content, _State) ->
%%----------------------------------------------------------------------------
+flow_control(Active, State = #ch{flow = #flow{server = Flow, pending = none}})
+ when Flow =:= not Active ->
+ ok = clear_permission_cache(),
+ noreply(issue_flow(Active, State));
+flow_control(Active, State = #ch{flow = F}) ->
+ noreply(State#ch{flow = F#flow{server = Active}}).
+
+issue_flow(Active, State) ->
+ ok = rabbit_writer:send_command(
+ State#ch.writer_pid, #'channel.flow'{active = Active}),
+ Ref = make_ref(),
+ {ok, TRef} = timer:apply_after(?FLOW_OK_TIMEOUT, ?MODULE, flow_timeout,
+ [self(), Ref]),
+ State#ch{flow = #flow{server = Active, client = not Active,
+ pending = {Ref, TRef}}}.
+
binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
- ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath}) ->
+ ReturnMethod, NoWait,
+ State = #ch{virtual_host = VHostPath,
+ reader_pid = ReaderPid}) ->
%% FIXME: connection exception (!) on failure??
%% (see rule named "failure" in spec-XML)
%% FIXME: don't allow binding to internal exchanges -
@@ -864,7 +925,8 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
State),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_read_permitted(ExchangeName, State),
- case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments) of
+ case Fun(ExchangeName, QueueName, ActualRoutingKey, Arguments,
+ fun (_X, Q) -> check_exclusive_access(Q, ReaderPid, lax) end) of
{error, exchange_not_found} ->
rabbit_misc:not_found(ExchangeName);
{error, queue_not_found} ->
@@ -878,10 +940,6 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
not_found, "no binding ~s between ~s and ~s",
[RoutingKey, rabbit_misc:rs(ExchangeName),
rabbit_misc:rs(QueueName)]);
- {error, durability_settings_incompatible} ->
- rabbit_misc:protocol_error(
- not_allowed, "durability settings of ~s incompatible with ~s",
- [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]);
ok -> return_ok(State, NoWait, ReturnMethod)
end.
@@ -916,7 +974,8 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
QTail, DeliveryTag, Multiple)
end;
{empty, _} ->
- {ToAcc, PrefixAcc}
+ rabbit_misc:protocol_error(
+ not_found, "unknown delivery tag ~w", [DeliveryTag])
end.
add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 835b1468..c5149b08 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -36,10 +36,12 @@
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
publish/2]).
--export([add_binding/4, delete_binding/4, list_bindings/1]).
+-export([add_binding/5, delete_binding/5, list_bindings/1]).
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
--export([check_type/1, assert_type/2]).
+-export([assert_equivalence/5]).
+-export([assert_args_equivalence/2]).
+-export([check_type/1]).
%% EXTENDED API
-export([list_exchange_bindings/1]).
@@ -58,11 +60,15 @@
'queue_not_found' |
'exchange_not_found' |
'exchange_and_queue_not_found'}).
+-type(inner_fun() :: fun((exchange(), queue()) -> any())).
+
-spec(recover/0 :: () -> 'ok').
-spec(declare/5 :: (exchange_name(), exchange_type(), boolean(), boolean(),
amqp_table()) -> exchange()).
-spec(check_type/1 :: (binary()) -> atom()).
--spec(assert_type/2 :: (exchange(), atom()) -> 'ok').
+-spec(assert_equivalence/5 :: (exchange(), atom(), boolean(), boolean(),
+ amqp_table()) -> 'ok').
+-spec(assert_args_equivalence/2 :: (exchange(), amqp_table()) -> 'ok').
-spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()).
-spec(lookup_or_die/1 :: (exchange_name()) -> exchange()).
-spec(list/1 :: (vhost()) -> [exchange()]).
@@ -72,11 +78,11 @@
-spec(info_all/1 :: (vhost()) -> [[info()]]).
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
--spec(add_binding/4 ::
- (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
- bind_res() | {'error', 'durability_settings_incompatible'}).
--spec(delete_binding/4 ::
- (exchange_name(), queue_name(), routing_key(), amqp_table()) ->
+-spec(add_binding/5 ::
+ (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
+ bind_res()).
+-spec(delete_binding/5 ::
+ (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
bind_res() | {'error', 'binding_not_found'}).
-spec(list_bindings/1 :: (vhost()) ->
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
@@ -183,13 +189,36 @@ check_type(TypeBin) ->
T
end.
-assert_type(#exchange{ type = ActualType }, RequiredType)
- when ActualType == RequiredType ->
- ok;
-assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) ->
+assert_equivalence(X = #exchange{ durable = Durable,
+ auto_delete = AutoDelete,
+ type = Type},
+ Type, Durable, AutoDelete,
+ RequiredArgs) ->
+ ok = (type_to_module(Type)):assert_args_equivalence(X, RequiredArgs);
+assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete,
+ _Args) ->
rabbit_misc:protocol_error(
- not_allowed, "cannot redeclare ~s of type '~s' with type '~s'",
- [rabbit_misc:rs(Name), ActualType, RequiredType]).
+ precondition_failed,
+ "cannot redeclare ~s with different type, durable or autodelete value",
+ [rabbit_misc:rs(Name)]).
+
+alternate_exchange_value(Args) ->
+ lists:keysearch(<<"alternate-exchange">>, 1, Args).
+
+assert_args_equivalence(#exchange{ name = Name,
+ arguments = Args },
+ RequiredArgs) ->
+ %% The spec says "Arguments are compared for semantic
+ %% equivalence". The only arg we care about is
+ %% "alternate-exchange".
+ Ae1 = alternate_exchange_value(RequiredArgs),
+ Ae2 = alternate_exchange_value(Args),
+ if Ae1==Ae2 -> ok;
+ true -> rabbit_misc:protocol_error(
+ precondition_failed,
+ "cannot redeclare ~s with inequivalent args",
+ [rabbit_misc:rs(Name)])
+ end.
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_exchange, Name}).
@@ -367,21 +396,23 @@ call_with_exchange_and_queue(Exchange, Queue, Fun) ->
end
end).
-add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
case binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
fun (X, Q, B) ->
- if Q#amqqueue.durable and not(X#exchange.durable) ->
- {error, durability_settings_incompatible};
- true ->
- case mnesia:read({rabbit_route, B}) of
- [] ->
- sync_binding(B, Q#amqqueue.durable,
- fun mnesia:write/3),
- {new, X, B};
- [_R] ->
- {existing, X, B}
- end
+ %% this argument is used to check queue exclusivity;
+ %% in general, we want to fail on that in preference to
+ %% anything else
+ InnerFun(X, Q),
+ case mnesia:read({rabbit_route, B}) of
+ [] ->
+ sync_binding(B,
+ X#exchange.durable andalso
+ Q#amqqueue.durable,
+ fun mnesia:write/3),
+ {new, X, B};
+ [_R] ->
+ {existing, X, B}
end
end) of
{new, Exchange = #exchange{ type = Type }, Binding} ->
@@ -392,14 +423,15 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
Err
end.
-delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) ->
+delete_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
case binding_action(
ExchangeName, QueueName, RoutingKey, Arguments,
fun (X, Q, B) ->
case mnesia:match_object(rabbit_route, #route{binding = B},
write) of
[] -> {error, binding_not_found};
- _ -> ok = sync_binding(B, Q#amqqueue.durable,
+ _ -> InnerFun(X, Q),
+ ok = sync_binding(B, Q#amqqueue.durable,
fun mnesia:delete_object/3),
{maybe_auto_delete(X), B}
end
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index a8c071e6..85760edc 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -54,7 +54,11 @@ behaviour_info(callbacks) ->
{add_binding, 2},
%% called after bindings have been deleted.
- {remove_bindings, 2}
+ {remove_bindings, 2},
+
+ %% called when comparing exchanges for equivalence - should return ok or
+ %% exit with #amqp_error{}
+ {assert_args_equivalence, 2}
];
behaviour_info(_Other) ->
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index 9b71e0e1..4f6eb851 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -36,7 +36,7 @@
-export([description/0, publish/2]).
-export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2]).
+ add_binding/2, remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -61,3 +61,5 @@ recover(_X, _Bs) -> ok.
delete(_X, _Bs) -> ok.
add_binding(_X, _B) -> ok.
remove_bindings(_X, _Bs) -> ok.
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 311654ab..4f9712b1 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -36,7 +36,7 @@
-export([description/0, publish/2]).
-export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2]).
+ add_binding/2, remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -59,3 +59,5 @@ recover(_X, _Bs) -> ok.
delete(_X, _Bs) -> ok.
add_binding(_X, _B) -> ok.
remove_bindings(_X, _Bs) -> ok.
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 285dab1a..315e8000 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -37,7 +37,7 @@
-export([description/0, publish/2]).
-export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2]).
+ add_binding/2, remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -135,3 +135,5 @@ recover(_X, _Bs) -> ok.
delete(_X, _Bs) -> ok.
add_binding(_X, _B) -> ok.
remove_bindings(_X, _Bs) -> ok.
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_exchange_type_registry.erl
index 175d15ad..33ea0e92 100644
--- a/src/rabbit_exchange_type_registry.erl
+++ b/src/rabbit_exchange_type_registry.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index 8a3dceea..0e22d545 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -18,11 +18,11 @@
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
+%% (C) 2007-2010 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
@@ -36,7 +36,7 @@
-export([description/0, publish/2]).
-export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2]).
+ add_binding/2, remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -99,3 +99,5 @@ recover(_X, _Bs) -> ok.
delete(_X, _Bs) -> ok.
add_binding(_X, _B) -> ok.
remove_bindings(_X, _Bs) -> ok.
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index dc7f92d1..ecc2613d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -41,6 +41,7 @@
-import(lists).
-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
-include_lib("kernel/include/file.hrl").
test_content_prop_roundtrip(Datum, Binary) ->
@@ -58,6 +59,7 @@ all_tests() ->
passed = test_log_management(),
passed = test_app_management(),
passed = test_log_management_during_startup(),
+ passed = test_memory_pressure(),
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_server_status(),
@@ -839,6 +841,132 @@ test_hooks() ->
end,
passed.
+test_memory_pressure_receiver(Pid) ->
+ receive
+ shutdown ->
+ ok;
+ {send_command, Method} ->
+ ok = case Method of
+ #'channel.flow'{} -> ok;
+ #'basic.qos_ok'{} -> ok;
+ #'channel.open_ok'{} -> ok
+ end,
+ Pid ! Method,
+ test_memory_pressure_receiver(Pid);
+ sync ->
+ Pid ! sync,
+ test_memory_pressure_receiver(Pid)
+ end.
+
+test_memory_pressure_receive_flow(Active) ->
+ receive #'channel.flow'{active = Active} -> ok
+ after 1000 -> throw(failed_to_receive_channel_flow)
+ end,
+ receive #'channel.flow'{} ->
+ throw(pipelining_sync_commands_detected)
+ after 0 ->
+ ok
+ end.
+
+test_memory_pressure_sync(Ch, Writer) ->
+ ok = rabbit_channel:do(Ch, #'basic.qos'{}),
+ Writer ! sync,
+ receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
+ receive #'basic.qos_ok'{} -> ok
+ after 1000 -> throw(failed_to_receive_basic_qos_ok)
+ end.
+
+test_memory_pressure_spawn() ->
+ Me = self(),
+ Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end),
+ Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
+ self()),
+ ok = rabbit_channel:do(Ch, #'channel.open'{}),
+ MRef = erlang:monitor(process, Ch),
+ receive #'channel.open_ok'{} -> ok
+ after 1000 -> throw(failed_to_receive_channel_open_ok)
+ end,
+ {Writer, Ch, MRef}.
+
+expect_normal_channel_termination(MRef, Ch) ->
+ receive {'DOWN', MRef, process, Ch, normal} -> ok
+ after 1000 -> throw(channel_failed_to_exit)
+ end.
+
+test_memory_pressure() ->
+ {Writer0, Ch0, MRef0} = test_memory_pressure_spawn(),
+ [ok = rabbit_channel:conserve_memory(Ch0, Conserve) ||
+ Conserve <- [false, false, true, false, true, true, false]],
+ ok = test_memory_pressure_sync(Ch0, Writer0),
+ receive {'DOWN', MRef0, process, Ch0, Info0} ->
+ throw({channel_died_early, Info0})
+ after 0 -> ok
+ end,
+
+ %% we should have just 1 active=false waiting for us
+ ok = test_memory_pressure_receive_flow(false),
+
+ %% if we reply with flow_ok, we should immediately get an
+ %% active=true back
+ ok = rabbit_channel:do(Ch0, #'channel.flow_ok'{active = false}),
+ ok = test_memory_pressure_receive_flow(true),
+
+ %% if we publish at this point, the channel should die
+ Content = #content{class_id = element(1, rabbit_framing:method_id(
+ 'basic.publish')),
+ properties = none,
+ properties_bin = <<>>,
+ payload_fragments_rev = []},
+ ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content),
+ expect_normal_channel_termination(MRef0, Ch0),
+
+ {Writer1, Ch1, MRef1} = test_memory_pressure_spawn(),
+ ok = rabbit_channel:conserve_memory(Ch1, true),
+ ok = test_memory_pressure_receive_flow(false),
+ ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}),
+ ok = test_memory_pressure_sync(Ch1, Writer1),
+ ok = rabbit_channel:conserve_memory(Ch1, false),
+ ok = test_memory_pressure_receive_flow(true),
+ %% send back the wrong flow_ok. Channel should die.
+ ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}),
+ expect_normal_channel_termination(MRef1, Ch1),
+
+ {_Writer2, Ch2, MRef2} = test_memory_pressure_spawn(),
+ %% just out of the blue, send a flow_ok. Life should end.
+ ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}),
+ expect_normal_channel_termination(MRef2, Ch2),
+
+ {_Writer3, Ch3, MRef3} = test_memory_pressure_spawn(),
+ ok = rabbit_channel:conserve_memory(Ch3, true),
+ receive {'DOWN', MRef3, process, Ch3, _} ->
+ ok
+ after 12000 ->
+ throw(channel_failed_to_exit)
+ end,
+
+ alarm_handler:set_alarm({vm_memory_high_watermark, []}),
+ Me = self(),
+ Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end),
+ Ch4 = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>,
+ self()),
+ ok = rabbit_channel:do(Ch4, #'channel.open'{}),
+ MRef4 = erlang:monitor(process, Ch4),
+ Writer4 ! sync,
+ receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
+ receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok)
+ after 0 -> ok
+ end,
+ alarm_handler:clear_alarm(vm_memory_high_watermark),
+ Writer4 ! sync,
+ receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
+ receive #'channel.open_ok'{} -> ok
+ after 1000 -> throw(failed_to_receive_channel_open_ok)
+ end,
+ rabbit_channel:shutdown(Ch4),
+ expect_normal_channel_termination(MRef4, Ch4),
+
+ passed.
+
test_delegates_async(SecondaryNode) ->
Self = self(),
Sender = fun (Pid) -> Pid ! {invoked, Self} end,