summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-06-21 17:42:33 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-06-21 17:42:33 +0100
commitab83c4a5e6245c97899328f2f9738e7d17cd2b8a (patch)
tree516f7324a08886a2dd4bed11d2c15dd8c462a2aa
parent4626253f57570a8d8fd6adc569b4794d89c6c800 (diff)
parent90b3bbf238abd46d73c01c1d184b81679d15dd3a (diff)
downloadrabbitmq-server-ab83c4a5e6245c97899328f2f9738e7d17cd2b8a.tar.gz
Merged bug 21848 into amqp_0_9_1
-rw-r--r--.hgignore1
-rw-r--r--Makefile31
-rw-r--r--codegen.py102
-rw-r--r--include/rabbit_exchange_type_spec.hrl1
-rw-r--r--include/rabbit_framing_spec.hrl60
-rwxr-xr-xpackaging/common/rabbitmq-server.ocf172
-rwxr-xr-xscripts/rabbitmq-multi3
-rw-r--r--scripts/rabbitmq-multi.bat6
-rwxr-xr-xscripts/rabbitmq-server3
-rw-r--r--scripts/rabbitmq-server.bat6
-rw-r--r--scripts/rabbitmq-service.bat6
-rwxr-xr-xscripts/rabbitmqctl3
-rw-r--r--scripts/rabbitmqctl.bat6
-rw-r--r--src/rabbit_amqqueue_process.erl7
-rw-r--r--src/rabbit_binary_generator.erl2
-rw-r--r--src/rabbit_channel.erl31
-rw-r--r--src/rabbit_exchange.erl60
-rw-r--r--src/rabbit_exchange_type.erl6
-rw-r--r--src/rabbit_exchange_type_direct.erl4
-rw-r--r--src/rabbit_exchange_type_fanout.erl4
-rw-r--r--src/rabbit_exchange_type_headers.erl4
-rw-r--r--src/rabbit_exchange_type_topic.erl4
-rw-r--r--src/rabbit_framing_channel.erl31
-rw-r--r--src/rabbit_reader.erl10
-rw-r--r--src/rabbit_writer.erl10
25 files changed, 336 insertions, 237 deletions
diff --git a/.hgignore b/.hgignore
index caaa3ace..7b796b66 100644
--- a/.hgignore
+++ b/.hgignore
@@ -10,6 +10,7 @@ syntax: regexp
^cover/
^dist/
^include/rabbit_framing\.hrl$
+^include/rabbit_framing_spec\.hrl$
^src/rabbit_framing\.erl$
^src/.*\_usage.erl$
^rabbit\.plt$
diff --git a/Makefile b/Makefile
index f9ceeb83..54edde23 100644
--- a/Makefile
+++ b/Makefile
@@ -11,10 +11,10 @@ SOURCE_DIR=src
EBIN_DIR=ebin
INCLUDE_DIR=include
DOCS_DIR=docs
-INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl
+INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl
SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL)
BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES))
-TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS)
+TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl $(BEAM_TARGETS)
WEB_URL=http://stage.rabbitmq.com/
MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml))
WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml)
@@ -40,7 +40,7 @@ BASIC_PLT=basic.plt
RABBIT_PLT=rabbit.plt
ifndef USE_SPECS
-# our type specs rely on features / bug fixes in dialyzer that are
+# our type specs rely on features and bug fixes in dialyzer that are
# only available in R13B01 upwards (R13B01 is eshell 5.7.2)
#
# NB: the test assumes that version number will only contain single digits
@@ -70,6 +70,12 @@ define usage_dep
$(call usage_xml_to_erl, $(1)): $(1) $(DOCS_DIR)/usage.xsl
endef
+ifneq "$(SBIN_DIR)" ""
+ifneq "$(TARGET_DIR)" ""
+SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR))
+endif
+endif
+
all: $(TARGETS)
$(DEPS_FILE): $(SOURCES) $(INCLUDES)
@@ -84,6 +90,9 @@ $(EBIN_DIR)/%.beam:
$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
$(PYTHON) codegen.py header $(AMQP_SPEC_JSON_FILES) $@
+$(INCLUDE_DIR)/rabbit_framing_spec.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
+ $(PYTHON) codegen.py spec $(AMQP_SPEC_JSON_FILES) $@
+
$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES)
$(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES) $@
@@ -110,7 +119,7 @@ $(BASIC_PLT): $(BEAM_TARGETS)
clean:
rm -f $(EBIN_DIR)/*.beam
rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel
- rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc
+ rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc
rm -f $(DOCS_DIR)/*.[0-9].gz $(DOCS_DIR)/*.man.xml $(DOCS_DIR)/*.erl $(USAGES_ERL)
rm -f $(RABBIT_PLT)
rm -f $(DEPS_FILE)
@@ -234,13 +243,7 @@ $(SOURCE_DIR)/%_usage.erl:
docs_all: $(MANPAGES) $(WEB_MANPAGES)
-install: SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR))
install: all docs_all install_dirs
- @[ -n "$(TARGET_DIR)" ] || (echo "Please set TARGET_DIR."; false)
- @[ -n "$(SBIN_DIR)" ] || (echo "Please set SBIN_DIR."; false)
- @[ -n "$(MAN_DIR)" ] || (echo "Please set MAN_DIR."; false)
-
- mkdir -p $(TARGET_DIR)
cp -r ebin include LICENSE LICENSE-MPL-RabbitMQ INSTALL $(TARGET_DIR)
chmod 0755 scripts/*
@@ -256,8 +259,14 @@ install: all docs_all install_dirs
done
install_dirs:
- mkdir -p $(SBIN_DIR)
+ @ OK=true && \
+ { [ -n "$(TARGET_DIR)" ] || { echo "Please set TARGET_DIR."; OK=false; }; } && \
+ { [ -n "$(SBIN_DIR)" ] || { echo "Please set SBIN_DIR."; OK=false; }; } && \
+ { [ -n "$(MAN_DIR)" ] || { echo "Please set MAN_DIR."; OK=false; }; } && $$OK
+
mkdir -p $(TARGET_DIR)/sbin
+ mkdir -p $(SBIN_DIR)
+ mkdir -p $(MAN_DIR)
$(foreach XML, $(USAGES_XML), $(eval $(call usage_dep, $(XML))))
diff --git a/codegen.py b/codegen.py
index cce25e78..d090be9b 100644
--- a/codegen.py
+++ b/codegen.py
@@ -93,6 +93,27 @@ class PackedMethodBitField:
def full(self):
return self.count() == 8
+def multiLineFormat(things, prologue, separator, lineSeparator, epilogue, thingsPerLine = 4):
+ r = [prologue]
+ i = 0
+ for t in things:
+ if i != 0:
+ if i % thingsPerLine == 0:
+ r += [lineSeparator]
+ else:
+ r += [separator]
+ r += [t]
+ i += 1
+ r += [epilogue]
+ return "".join(r)
+
+def prettyType(typeName, subTypes, typesPerLine = 4):
+ """Pretty print a type signature made up of many alternative subtypes"""
+ sTs = multiLineFormat(subTypes,
+ "( ", " | ", "\n | ", " )",
+ thingsPerLine = typesPerLine)
+ return "-type(%s ::\n %s)." % (typeName, sTs)
+
def printFileHeader():
print """%% Autogenerated code. Do not edit.
%%
@@ -314,6 +335,22 @@ def genErl(spec):
bitvalue(true) -> 1;
bitvalue(false) -> 0;
bitvalue(undefined) -> 0.
+
+%% Method signatures
+-ifdef(use_specs).
+-spec(lookup_method_name/1 :: (amqp_method()) -> amqp_method_name()).
+-spec(method_id/1 :: (amqp_method_name()) -> amqp_method()).
+-spec(method_has_content/1 :: (amqp_method_name()) -> boolean()).
+-spec(is_method_synchronous/1 :: (amqp_method_record()) -> boolean()).
+-spec(method_record/1 :: (amqp_method_name()) -> amqp_method_record()).
+-spec(method_fieldnames/1 :: (amqp_method_name()) -> [amqp_method_field_name()]).
+-spec(decode_method_fields/2 :: (amqp_method_name(), binary()) -> amqp_method_record()).
+-spec(decode_properties/2 :: (non_neg_integer(), binary()) -> amqp_property_record()).
+-spec(encode_method_fields/1 :: (amqp_method_record()) -> binary()).
+-spec(encode_properties/1 :: (amqp_method_record()) -> binary()).
+-spec(lookup_amqp_exception/1 :: (amqp_exception()) -> {boolean(), amqp_exception_code(), binary()}).
+-spec(amqp_exception/1 :: (amqp_exception_code()) -> amqp_exception()).
+-endif. % use_specs
"""
for m in methods: genLookupMethodName(m)
print "lookup_method_name({_ClassId, _MethodId} = Id) -> exit({unknown_method_id, Id})."
@@ -389,12 +426,75 @@ def genHrl(spec):
for c in spec.allClasses():
print "-record('P_%s', {%s})." % (erlangize(c.name), fieldNameList(c.fields))
+ print "-ifdef(use_specs)."
+ print "%% Various types"
+ print prettyType("amqp_method_name()",
+ [m.erlangName() for m in methods])
+ print prettyType("amqp_method()",
+ ["{%s, %s}" % (m.klass.index, m.index) for m in methods],
+ 6)
+ print prettyType("amqp_method_record()",
+ ["#%s{}" % (m.erlangName()) for m in methods])
+ fieldNames = set()
+ for m in methods:
+ fieldNames.update(m.arguments)
+ fieldNames = [erlangize(f.name) for f in fieldNames]
+ print prettyType("amqp_method_field_name()",
+ fieldNames)
+ print prettyType("amqp_property_record()",
+ ["#'P_%s'{}" % erlangize(c.name) for c in spec.allClasses()])
+ print prettyType("amqp_exception()",
+ ["'%s'" % erlangConstantName(c).lower() for (c, v, cls) in spec.constants])
+ print prettyType("amqp_exception_code()",
+ ["%i" % v for (c, v, cls) in spec.constants])
+ print "-endif. % use_specs"
+
+def genSpec(spec):
+ methods = spec.allMethods()
+
+ printFileHeader()
+ print """% Hard-coded types
+-type(amqp_field_type() ::
+ 'longstr' | 'signedint' | 'decimal' | 'timestamp' |
+ 'table' | 'byte' | 'double' | 'float' | 'long' |
+ 'short' | 'bool' | 'binary' | 'void').
+-type(amqp_property_type() ::
+ 'shortstr' | 'longstr' | 'octet' | 'shortint' | 'longint' |
+ 'longlongint' | 'timestamp' | 'bit' | 'table').
+%% we could make this more precise but ultimately are limited by
+%% dialyzer's lack of support for recursive types
+-type(amqp_table() :: [{binary(), amqp_field_type(), any()}]).
+%% TODO: make this more precise
+-type(amqp_properties() :: tuple()).
+
+-type(channel_number() :: non_neg_integer()).
+-type(resource_name() :: binary()).
+-type(routing_key() :: binary()).
+-type(username() :: binary()).
+-type(password() :: binary()).
+-type(vhost() :: binary()).
+-type(ctag() :: binary()).
+-type(exchange_type() :: atom()).
+-type(binding_key() :: binary()).
+"""
+ print "% Auto-generated types"
+ classIds = set()
+ for m in spec.allMethods():
+ classIds.add(m.klass.index)
+ print prettyType("amqp_class_id()",
+ ["%i" % ci for ci in classIds])
+
def generateErl(specPath):
genErl(AmqpSpec(specPath))
def generateHrl(specPath):
genHrl(AmqpSpec(specPath))
+def generateSpec(specPath):
+ genSpec(AmqpSpec(specPath))
+
if __name__ == "__main__":
- do_main(generateHrl, generateErl)
+ do_main_dict({"header": generateHrl,
+ "spec": generateSpec,
+ "body": generateErl})
diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl
index 38057beb..cb564365 100644
--- a/include/rabbit_exchange_type_spec.hrl
+++ b/include/rabbit_exchange_type_spec.hrl
@@ -38,5 +38,6 @@
-spec(delete/2 :: (exchange(), list(binding())) -> 'ok').
-spec(add_binding/2 :: (exchange(), binding()) -> 'ok').
-spec(remove_bindings/2 :: (exchange(), list(binding())) -> 'ok').
+-spec(assert_args_equivalence/2 :: (exchange(), amqp_table()) -> 'ok').
-endif.
diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl
deleted file mode 100644
index 1a979899..00000000
--- a/include/rabbit_framing_spec.hrl
+++ /dev/null
@@ -1,60 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2010 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2010 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
-%% TODO: much of this should be generated
-
--type(amqp_field_type() ::
- 'longstr' | 'signedint' | 'decimal' | 'timestamp' |
- 'table' | 'byte' | 'double' | 'float' | 'long' |
- 'short' | 'bool' | 'binary' | 'void').
--type(amqp_property_type() ::
- 'shortstr' | 'longstr' | 'octet' | 'shortint' | 'longint' |
- 'longlongint' | 'timestamp' | 'bit' | 'table').
-%% we could make this more precise but ultimately are limited by
-%% dialyzer's lack of support for recursive types
--type(amqp_table() :: [{binary(), amqp_field_type(), any()}]).
-%% TODO: make this more precise
--type(amqp_class_id() :: non_neg_integer()).
-%% TODO: make this more precise
--type(amqp_properties() :: tuple()).
-%% TODO: make this more precise
--type(amqp_method() :: tuple()).
-%% TODO: make this more precise
--type(amqp_method_name() :: atom()).
--type(channel_number() :: non_neg_integer()).
--type(resource_name() :: binary()).
--type(routing_key() :: binary()).
--type(username() :: binary()).
--type(password() :: binary()).
--type(vhost() :: binary()).
--type(ctag() :: binary()).
--type(exchange_type() :: atom()).
--type(binding_key() :: binary()).
diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf
index 97c58ea2..db0ed70b 100755
--- a/packaging/common/rabbitmq-server.ocf
+++ b/packaging/common/rabbitmq-server.ocf
@@ -35,21 +35,22 @@
##
## OCF instance parameters
-## OCF_RESKEY_multi
-## OCF_RESKEY_ctl
-## OCF_RESKEY_nodename
-## OCF_RESKEY_ip
-## OCF_RESKEY_port
-## OCF_RESKEY_cluster_config_file
-## OCF_RESKEY_config_file
-## OCF_RESKEY_log_base
-## OCF_RESKEY_mnesia_base
-## OCF_RESKEY_server_start_args
+## OCF_RESKEY_multi
+## OCF_RESKEY_ctl
+## OCF_RESKEY_nodename
+## OCF_RESKEY_ip
+## OCF_RESKEY_port
+## OCF_RESKEY_cluster_config_file
+## OCF_RESKEY_config_file
+## OCF_RESKEY_log_base
+## OCF_RESKEY_mnesia_base
+## OCF_RESKEY_server_start_args
#######################################################################
# Initialization:
-. ${OCF_ROOT}/resource.d/heartbeat/.ocf-shellfuncs
+: ${OCF_FUNCTIONS_DIR=${OCF_ROOT}/resource.d/heartbeat}
+. ${OCF_FUNCTIONS_DIR}/.ocf-shellfuncs
#######################################################################
@@ -63,7 +64,7 @@ OCF_RESKEY_log_base_default="/var/log/rabbitmq"
: ${OCF_RESKEY_log_base=${OCF_RESKEY_log_base_default}}
meta_data() {
- cat <<END
+ cat <<END
<?xml version="1.0"?>
<!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd">
<resource-agent name="rabbitmq-server">
@@ -113,7 +114,7 @@ The IP address for rabbitmq-server to listen on
The IP Port for rabbitmq-server to listen on
</longdesc>
<shortdesc lang="en">IP Port</shortdesc>
-<content type="string" default="" />
+<content type="integer" default="" />
</parameter>
<parameter name="cluster_config_file" unique="0" required="0">
@@ -161,7 +162,8 @@ Additional arguments provided to the server on startup
<actions>
<action name="start" timeout="600" />
<action name="stop" timeout="120" />
-<action name="monitor" timeout="20" interval="10" depth="0" start-delay="0" />
+<action name="status" timeout="20" interval="10" />
+<action name="monitor" timeout="20" interval="10" />
<action name="validate-all" timeout="30" />
<action name="meta-data" timeout="5" />
</actions>
@@ -170,8 +172,8 @@ END
}
rabbit_usage() {
- cat <<END
-usage: $0 {start|stop|monitor|validate-all|meta-data}
+ cat <<END
+usage: $0 {start|stop|status|monitor|validate-all|meta-data}
Expects to have a fully populated OCF RA-compliant environment set.
END
@@ -202,35 +204,35 @@ export_vars() {
rabbit_validate_partial() {
if [ ! -x $RABBITMQ_MULTI ]; then
- ocf_log err "rabbitmq-server multi $RABBITMQ_MULTI does not exist or is not executable";
- return $OCF_ERR_ARGS;
+ ocf_log err "rabbitmq-server multi $RABBITMQ_MULTI does not exist or is not executable";
+ exit $OCF_ERR_INSTALLED;
fi
if [ ! -x $RABBITMQ_CTL ]; then
- ocf_log err "rabbitmq-server ctl $RABBITMQ_CTL does not exist or is not executable";
- return $OCF_ERR_ARGS;
+ ocf_log err "rabbitmq-server ctl $RABBITMQ_CTL does not exist or is not executable";
+ exit $OCF_ERR_INSTALLED;
fi
}
rabbit_validate_full() {
if [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && [ ! -e $RABBITMQ_CLUSTER_CONFIG_FILE ]; then
- ocf_log err "rabbitmq-server cluster_config_file $RABBITMQ_CLUSTER_CONFIG_FILE does not exist or is not a file";
- return $OCF_ERR_ARGS;
+ ocf_log err "rabbitmq-server cluster_config_file $RABBITMQ_CLUSTER_CONFIG_FILE does not exist or is not a file";
+ exit $OCF_ERR_INSTALLED;
fi
if [ ! -z $RABBITMQ_CONFIG_FILE ] && [ ! -e $RABBITMQ_CONFIG_FILE ]; then
- ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file";
- return $OCF_ERR_ARGS;
+ ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file";
+ exit $OCF_ERR_INSTALLED;
fi
if [ ! -z $RABBITMQ_LOG_BASE ] && [ ! -d $RABBITMQ_LOG_BASE ]; then
- ocf_log err "rabbitmq-server log_base $RABBITMQ_LOG_BASE does not exist or is not a directory";
- return $OCF_ERR_ARGS;
+ ocf_log err "rabbitmq-server log_base $RABBITMQ_LOG_BASE does not exist or is not a directory";
+ exit $OCF_ERR_INSTALLED;
fi
if [ ! -z $RABBITMQ_MNESIA_BASE ] && [ ! -d $RABBITMQ_MNESIA_BASE ]; then
- ocf_log err "rabbitmq-server mnesia_base $RABBITMQ_MNESIA_BASE does not exist or is not a directory";
- return $OCF_ERR_ARGS;
+ ocf_log err "rabbitmq-server mnesia_base $RABBITMQ_MNESIA_BASE does not exist or is not a directory";
+ exit $OCF_ERR_INSTALLED;
fi
rabbit_validate_partial
@@ -243,25 +245,26 @@ rabbit_status() {
$RABBITMQ_CTL $NODENAME_ARG status > /dev/null 2> /dev/null
rc=$?
case "$rc" in
- 0)
- return $OCF_SUCCESS
- ;;
- 2)
- return $OCF_NOT_RUNNING
- ;;
- *)
- ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG status: $rc"
- return $OCF_ERR_GENERIC
+ 0)
+ ocf_log debug "RabbitMQ server is running normally"
+ return $OCF_SUCCESS
+ ;;
+ 2)
+ ocf_log debug "RabbitMQ server is not running"
+ return $OCF_NOT_RUNNING
+ ;;
+ *)
+ ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG status: $rc"
+ exit $OCF_ERR_GENERIC
esac
}
rabbit_start() {
local rc
- rabbit_validate_full
- rc=$?
- if [ "$rc" != $OCF_SUCCESS ]; then
- return $rc
+ if rabbit_status; then
+ ocf_log info "Resource already running."
+ return $OCF_SUCCESS
fi
export_vars
@@ -270,24 +273,23 @@ rabbit_start() {
rc=$?
if [ "$rc" != 0 ]; then
- ocf_log err "rabbitmq-server start command failed: $RABBITMQ_MULTI start_all 1, $rc"
- return $rc
+ ocf_log err "rabbitmq-server start command failed: $RABBITMQ_MULTI start_all 1, $rc"
+ return $rc
fi
# Spin waiting for the server to come up.
# Let the CRM/LRM time us out if required
start_wait=1
while [ $start_wait = 1 ]; do
- rabbit_status
- rc=$?
- if [ "$rc" = $OCF_SUCCESS ]; then
- start_wait=0
-
- elif [ "$rc" != $OCF_NOT_RUNNING ]; then
- ocf_log info "rabbitmq-server start failed: $rc"
- return $OCF_ERR_GENERIC
- fi
- sleep 2
+ rabbit_status
+ rc=$?
+ if [ "$rc" = $OCF_SUCCESS ]; then
+ start_wait=0
+ elif [ "$rc" != $OCF_NOT_RUNNING ]; then
+ ocf_log info "rabbitmq-server start failed: $rc"
+ exit $OCF_ERR_GENERIC
+ fi
+ sleep 1
done
return $OCF_SUCCESS
@@ -295,28 +297,34 @@ rabbit_start() {
rabbit_stop() {
local rc
+
+ if ! rabbit_status; then
+ ocf_log info "Resource not running."
+ return $OCF_SUCCESS
+ fi
+
$RABBITMQ_MULTI stop_all &
rc=$?
if [ "$rc" != 0 ]; then
- ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_MULTI stop_all, $rc"
- return $rc
+ ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_MULTI stop_all, $rc"
+ return $rc
fi
# Spin waiting for the server to shut down.
# Let the CRM/LRM time us out if required
stop_wait=1
while [ $stop_wait = 1 ]; do
- rabbit_status
- rc=$?
- if [ "$rc" = $OCF_NOT_RUNNING ]; then
- stop_wait=0
+ rabbit_status
+ rc=$?
+ if [ "$rc" = $OCF_NOT_RUNNING ]; then
+ stop_wait=0
break
- elif [ "$rc" != $OCF_SUCCESS ]; then
- ocf_log info "rabbitmq-server stop failed: $rc"
- return $OCF_ERR_GENERIC
- fi
- sleep 2
+ elif [ "$rc" != $OCF_SUCCESS ]; then
+ ocf_log info "rabbitmq-server stop failed: $rc"
+ exit $OCF_ERR_GENERIC
+ fi
+ sleep 1
done
return $OCF_SUCCESS
@@ -329,34 +337,38 @@ rabbit_monitor() {
case $__OCF_ACTION in
meta-data)
- meta_data
- exit $OCF_SUCCESS
- ;;
+ meta_data
+ exit $OCF_SUCCESS
+ ;;
usage|help)
- rabbit_usage
- exit $OCF_SUCCESS
- ;;
+ rabbit_usage
+ exit $OCF_SUCCESS
+ ;;
esac
-rabbit_validate_partial || exit
+if ocf_is_probe; then
+ rabbit_validate_partial
+else
+ rabbit_validate_full
+fi
case $__OCF_ACTION in
start)
- rabbit_start
+ rabbit_start
;;
stop)
- rabbit_stop
+ rabbit_stop
;;
- monitor)
- rabbit_monitor
+ status|monitor)
+ rabbit_monitor
;;
validate-all)
exit $OCF_SUCCESS
- ;;
+ ;;
*)
- rabbit_usage
- exit $OCF_ERR_UNIMPLEMENTED
- ;;
+ rabbit_usage
+ exit $OCF_ERR_UNIMPLEMENTED
+ ;;
esac
-exit $? \ No newline at end of file
+exit $?
diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi
index 8341d35c..59050692 100755
--- a/scripts/rabbitmq-multi
+++ b/scripts/rabbitmq-multi
@@ -29,7 +29,8 @@
##
## Contributor(s): ______________________________________.
##
-NODENAME=rabbit
+[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
+NODENAME=rabbit@${HOSTNAME%%.*}
SCRIPT_HOME=$(dirname $0)
PIDS_FILE=/var/lib/rabbitmq/pids
MULTI_ERL_ARGS=
diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat
index a4b7f2e9..a4f8c8b4 100644
--- a/scripts/rabbitmq-multi.bat
+++ b/scripts/rabbitmq-multi.bat
@@ -42,8 +42,12 @@ if "!RABBITMQ_BASE!"=="" (
set RABBITMQ_BASE=!APPDATA!\RabbitMQ
)
+if "!COMPUTERNAME!"=="" (
+ set COMPUTERNAME=localhost
+)
+
if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index ccdfc401..2261b56e 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -30,7 +30,8 @@
## Contributor(s): ______________________________________.
##
-NODENAME=rabbit
+[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
+NODENAME=rabbit@${HOSTNAME%%.*}
SERVER_ERL_ARGS="+K true +A30 +P 1048576 \
-kernel inet_default_listen_options [{nodelay,true}] \
-kernel inet_default_connect_options [{nodelay,true}]"
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 57fe1328..a290f935 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -42,8 +42,12 @@ if "!RABBITMQ_BASE!"=="" (
set RABBITMQ_BASE=!APPDATA!\RabbitMQ
)
+if "!COMPUTERNAME!"=="" (
+ set COMPUTERNAME=localhost
+)
+
if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index a4021fd6..bd117b83 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -48,8 +48,12 @@ if "!RABBITMQ_BASE!"=="" (
set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
+if "!COMPUTERNAME!"=="" (
+ set COMPUTERNAME=localhost
+)
+
if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl
index cfb775eb..92e5312b 100755
--- a/scripts/rabbitmqctl
+++ b/scripts/rabbitmqctl
@@ -30,7 +30,8 @@
## Contributor(s): ______________________________________.
##
-NODENAME=rabbit
+[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s`
+NODENAME=rabbit@${HOSTNAME%%.*}
. `dirname $0`/rabbitmq-env
diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat
index 55572451..563b9e58 100644
--- a/scripts/rabbitmqctl.bat
+++ b/scripts/rabbitmqctl.bat
@@ -38,8 +38,12 @@ set TDP0=%~dp0
set STAR=%*
setlocal enabledelayedexpansion
+if "!COMPUTERNAME!"=="" (
+ set COMPUTERNAME=localhost
+)
+
if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
if not exist "!ERLANG_HOME!\bin\erl.exe" (
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e7c92664..5fdf0ffa 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -716,16 +716,17 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
{Count, BQS1} = BQ:purge(BQS),
reply({ok, Count}, State#q{backing_queue_state = BQS1});
-handle_call({requeue, AckTags, ChPid}, _From, State) ->
+handle_call({requeue, AckTags, ChPid}, From, State) ->
+ gen_server2:reply(From, ok),
case lookup_ch(ChPid) of
not_found ->
rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n",
[ChPid]),
- reply(ok, State);
+ noreply(State);
C = #cr{acktags = ChAckTags} ->
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
store_ch_record(C#cr{acktags = ChAckTags1}),
- reply(ok, requeue_and_run(AckTags, State))
+ noreply(requeue_and_run(AckTags, State))
end;
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 27a1275a..81cf3cee 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -57,7 +57,7 @@
-type(frame() :: [binary()]).
-spec(build_simple_method_frame/2 ::
- (channel_number(), amqp_method()) -> frame()).
+ (channel_number(), amqp_method_record()) -> frame()).
-spec(build_simple_content_frames/3 ::
(channel_number(), content(), non_neg_integer()) -> [frame()]).
-spec(build_heartbeat_frame/0 :: () -> frame()).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 58c56cc0..3dfc026b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -75,8 +75,8 @@
-spec(start_link/6 ::
(channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()).
--spec(do/2 :: (pid(), amqp_method()) -> 'ok').
--spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
+-spec(do/2 :: (pid(), amqp_method_record()) -> 'ok').
+-spec(do/3 :: (pid(), amqp_method_record(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
-spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok').
@@ -344,7 +344,7 @@ with_exclusive_access_or_die(QName, ReaderPid, F) ->
expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
- not_allowed, "no previously declared queue", []);
+ not_found, "no previously declared queue", []);
expand_queue_name_shortcut(<<>>, #ch{ virtual_host = VHostPath,
most_recently_declared_queue = MRDQ }) ->
rabbit_misc:r(VHostPath, queue, MRDQ);
@@ -354,7 +354,7 @@ expand_queue_name_shortcut(QueueNameBin, #ch{ virtual_host = VHostPath }) ->
expand_routing_key_shortcut(<<>>, <<>>,
#ch{ most_recently_declared_queue = <<>> }) ->
rabbit_misc:protocol_error(
- not_allowed, "no previously declared queue", []);
+ not_found, "no previously declared queue", []);
expand_routing_key_shortcut(<<>>, <<>>,
#ch{ most_recently_declared_queue = MRDQ }) ->
MRDQ;
@@ -460,13 +460,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
_, State = #ch{transaction_id = TxnKey,
- next_tag = NextDeliveryTag,
unacked_message_q = UAMQ}) ->
- if DeliveryTag >= NextDeliveryTag ->
- rabbit_misc:protocol_error(
- command_invalid, "unknown delivery tag ~w", [DeliveryTag]);
- true -> ok
- end,
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
Participants = ack(TxnKey, Acked),
{noreply, case TxnKey of
@@ -528,9 +522,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
Other -> Other
end,
- %% In order to ensure that the consume_ok gets sent before
- %% any messages are sent to the consumer, we get the queue
- %% process to send the consume_ok on our behalf.
+ %% We get the queue process to send the consume_ok on our
+ %% behalf. This is for symmetry with basic.cancel - see
+ %% the comment in that method for why.
case with_exclusive_access_or_die(
QueueName, ReaderPid,
fun (Q) ->
@@ -843,14 +837,14 @@ handle_method(#'tx.select'{}, _, State) ->
handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) ->
rabbit_misc:protocol_error(
- not_allowed, "channel is not transactional", []);
+ precondition_failed, "channel is not transactional", []);
handle_method(#'tx.commit'{}, _, State) ->
{reply, #'tx.commit_ok'{}, internal_commit(State)};
handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) ->
rabbit_misc:protocol_error(
- not_allowed, "channel is not transactional", []);
+ precondition_failed, "channel is not transactional", []);
handle_method(#'tx.rollback'{}, _, State) ->
{reply, #'tx.rollback_ok'{}, internal_rollback(State)};
@@ -951,10 +945,6 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
not_found, "no binding ~s between ~s and ~s",
[RoutingKey, rabbit_misc:rs(ExchangeName),
rabbit_misc:rs(QueueName)]);
- {error, durability_settings_incompatible} ->
- rabbit_misc:protocol_error(
- not_allowed, "durability settings of ~s incompatible with ~s",
- [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]);
ok -> return_ok(State, NoWait, ReturnMethod)
end.
@@ -989,7 +979,8 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
QTail, DeliveryTag, Multiple)
end;
{empty, _} ->
- {ToAcc, PrefixAcc}
+ rabbit_misc:protocol_error(
+ not_found, "unknown delivery tag ~w", [DeliveryTag])
end.
add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 841161db..eb6f3e49 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -40,7 +40,8 @@
-export([delete/2]).
-export([delete_queue_bindings/1, delete_transient_queue_bindings/1]).
-export([assert_equivalence/4]).
--export([check_type/1, assert_type/2]).
+-export([assert_args_equivalence/2]).
+-export([check_type/1]).
%% EXTENDED API
-export([list_exchange_bindings/1]).
@@ -65,6 +66,7 @@
-spec(declare/4 :: (exchange_name(), exchange_type(), boolean(), amqp_table()) -> exchange()).
-spec(check_type/1 :: (binary()) -> atom()).
-spec(assert_equivalence/4 :: (exchange(), atom(), boolean(), amqp_table()) -> 'ok').
+-spec(assert_args_equivalence/2 :: (exchange(), amqp_table()) -> 'ok').
-spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()).
-spec(lookup_or_die/1 :: (exchange_name()) -> exchange()).
-spec(list/1 :: (vhost()) -> [exchange()]).
@@ -76,7 +78,7 @@
-spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}).
-spec(add_binding/5 ::
(exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
- bind_res() | {'error', 'durability_settings_incompatible'}).
+ bind_res()).
-spec(delete_binding/5 ::
(exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) ->
bind_res() | {'error', 'binding_not_found'}).
@@ -184,28 +186,22 @@ check_type(TypeBin) ->
T
end.
-assert_equivalence(X = #exchange{ durable = ActualDurable },
- RequiredType, RequiredDurable, RequiredArgs)
- when ActualDurable == RequiredDurable ->
- ok = assert_type(X, RequiredType),
- ok = assert_args_equivalence(X, RequiredArgs);
-assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _Args) ->
+assert_equivalence(X = #exchange{ durable = Durable,
+ type = Type},
+ Type, Durable,
+ RequiredArgs) ->
+ ok = (type_to_module(Type)):assert_args_equivalence(X, RequiredArgs);
+assert_equivalence(#exchange{ name = Name }, _Type, _Durable,
+ _Args) ->
rabbit_misc:protocol_error(
- not_allowed, "cannot redeclare ~s with different durable value",
+ precondition_failed,
+ "cannot redeclare ~s with different type, durable or autodelete value",
[rabbit_misc:rs(Name)]).
-assert_type(#exchange{ type = ActualType }, RequiredType)
- when ActualType == RequiredType ->
- ok;
-assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) ->
- rabbit_misc:protocol_error(
- not_allowed, "cannot redeclare ~s of type '~s' with type '~s'",
- [rabbit_misc:rs(Name), ActualType, RequiredType]).
-
alternate_exchange_value(Args) ->
lists:keysearch(<<"alternate-exchange">>, 1, Args).
-assert_args_equivalence(#exchange{ name = Name,
+assert_args_equivalence(#exchange{ name = Name,
arguments = Args },
RequiredArgs) ->
%% The spec says "Arguments are compared for semantic
@@ -213,9 +209,9 @@ assert_args_equivalence(#exchange{ name = Name,
%% "alternate-exchange".
Ae1 = alternate_exchange_value(RequiredArgs),
Ae2 = alternate_exchange_value(Args),
- if Ae1==Ae2 -> ok;
- true -> rabbit_misc:protocol_error(
- not_allowed,
+ if Ae1==Ae2 -> ok;
+ true -> rabbit_misc:protocol_error(
+ precondition_failed,
"cannot redeclare ~s with inequivalent args",
[rabbit_misc:rs(Name)])
end.
@@ -400,19 +396,17 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) ->
fun (X, Q, B) ->
%% this argument is used to check queue exclusivity;
%% in general, we want to fail on that in preference to
- %% failing on e.g., the durability being different.
+ %% anything else
InnerFun(X, Q),
- if Q#amqqueue.durable and not(X#exchange.durable) ->
- {error, durability_settings_incompatible};
- true ->
- case mnesia:read({rabbit_route, B}) of
- [] ->
- sync_binding(B, Q#amqqueue.durable,
- fun mnesia:write/3),
- {new, X, B};
- [_R] ->
- {existing, X, B}
- end
+ case mnesia:read({rabbit_route, B}) of
+ [] ->
+ sync_binding(B,
+ X#exchange.durable andalso
+ Q#amqqueue.durable,
+ fun mnesia:write/3),
+ {new, X, B};
+ [_R] ->
+ {existing, X, B}
end
end) of
{new, Exchange = #exchange{ type = Type }, Binding} ->
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 699250f7..85760edc 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -54,7 +54,11 @@ behaviour_info(callbacks) ->
{add_binding, 2},
%% called after bindings have been deleted.
- {remove_bindings, 2}
+ {remove_bindings, 2},
+
+ %% called when comparing exchanges for equivalence - should return ok or
+ %% exit with #amqp_error{}
+ {assert_args_equivalence, 2}
];
behaviour_info(_Other) ->
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index c3fb2588..4f6eb851 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -36,7 +36,7 @@
-export([description/0, publish/2]).
-export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2]).
+ add_binding/2, remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -61,3 +61,5 @@ recover(_X, _Bs) -> ok.
delete(_X, _Bs) -> ok.
add_binding(_X, _B) -> ok.
remove_bindings(_X, _Bs) -> ok.
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index 62c862a5..4f9712b1 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -36,7 +36,7 @@
-export([description/0, publish/2]).
-export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2]).
+ add_binding/2, remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -59,3 +59,5 @@ recover(_X, _Bs) -> ok.
delete(_X, _Bs) -> ok.
add_binding(_X, _B) -> ok.
remove_bindings(_X, _Bs) -> ok.
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl
index 0991bf0d..315e8000 100644
--- a/src/rabbit_exchange_type_headers.erl
+++ b/src/rabbit_exchange_type_headers.erl
@@ -37,7 +37,7 @@
-export([description/0, publish/2]).
-export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2]).
+ add_binding/2, remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -135,3 +135,5 @@ recover(_X, _Bs) -> ok.
delete(_X, _Bs) -> ok.
add_binding(_X, _B) -> ok.
remove_bindings(_X, _Bs) -> ok.
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index e42c4518..0e22d545 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -36,7 +36,7 @@
-export([description/0, publish/2]).
-export([validate/1, create/1, recover/2, delete/2,
- add_binding/2, remove_bindings/2]).
+ add_binding/2, remove_bindings/2, assert_args_equivalence/2]).
-include("rabbit_exchange_type_spec.hrl").
-rabbit_boot_step({?MODULE,
@@ -99,3 +99,5 @@ recover(_X, _Bs) -> ok.
delete(_X, _Bs) -> ok.
add_binding(_X, _B) -> ok.
remove_bindings(_X, _Bs) -> ok.
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index b7c6aa96..161dfd84 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -73,14 +73,23 @@ read_frame(ChannelPid) ->
end.
mainloop(ChannelPid) ->
- {method, MethodName, FieldsBin} = read_frame(ChannelPid),
- Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin),
- case rabbit_framing:method_has_content(MethodName) of
- true -> rabbit_channel:do(ChannelPid, Method,
- collect_content(ChannelPid, MethodName));
- false -> rabbit_channel:do(ChannelPid, Method)
- end,
- ?MODULE:mainloop(ChannelPid).
+ Decoded = read_frame(ChannelPid),
+ case Decoded of
+ {method, MethodName, FieldsBin} ->
+ Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin),
+ case rabbit_framing:method_has_content(MethodName) of
+ true -> rabbit_channel:do(ChannelPid, Method,
+ collect_content(ChannelPid,
+ MethodName));
+ false -> rabbit_channel:do(ChannelPid, Method)
+ end,
+ ?MODULE:mainloop(ChannelPid);
+ _ ->
+ rabbit_misc:protocol_error(
+ unexpected_frame,
+ "expected method frame, got ~p instead",
+ [Decoded])
+ end.
collect_content(ChannelPid, MethodName) ->
{ClassId, _MethodId} = rabbit_framing:method_id(MethodName),
@@ -94,14 +103,14 @@ collect_content(ChannelPid, MethodName) ->
payload_fragments_rev = Payload};
true ->
rabbit_misc:protocol_error(
- command_invalid,
+ unexpected_frame,
"expected content header for class ~w, "
"got one for class ~w instead",
[ClassId, HeaderClassId])
end;
_ ->
rabbit_misc:protocol_error(
- command_invalid,
+ unexpected_frame,
"expected content header for class ~w, "
"got non content header frame instead",
[ClassId])
@@ -117,7 +126,7 @@ collect_content_payload(ChannelPid, RemainingByteCount, Acc) ->
[FragmentBin | Acc]);
_ ->
rabbit_misc:protocol_error(
- command_invalid,
+ unexpected_frame,
"expected content body, got non content body frame instead",
[])
end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index bb880dad..3e03ae0c 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -105,6 +105,8 @@
%% heartbeat timeout -> *throw*
%% closing:
%% socket close -> *terminate*
+%% receive connection.close -> send connection.close_ok,
+%% *closing*
%% receive frame -> ignore, *closing*
%% handshake_timeout -> ignore, *closing*
%% heartbeat timeout -> *throw*
@@ -121,6 +123,8 @@
%% start terminate_connection timer, *closed*
%% closed:
%% socket close -> *terminate*
+%% receive connection.close -> send connection.close_ok,
+%% *closed*
%% receive connection.close_ok -> self() ! terminate_connection,
%% *closed*
%% receive frame -> ignore, *closed*
@@ -659,6 +663,12 @@ handle_method0(#'connection.close'{},
State = #v1{connection_state = running}) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
+handle_method0(#'connection.close'{}, State = #v1{connection_state = CS})
+ when CS =:= closing; CS =:= closed ->
+ %% We're already closed or closing, so we don't need to cleanup
+ %% anything.
+ ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
+ State;
handle_method0(#'connection.close_ok'{},
State = #v1{connection_state = closed}) ->
self() ! terminate_connection,
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 54c60f5b..3d10dc12 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -50,17 +50,17 @@
-spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()).
-spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()).
--spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
--spec(send_command/3 :: (pid(), amqp_method(), content()) -> 'ok').
+-spec(send_command/2 :: (pid(), amqp_method_record()) -> 'ok').
+-spec(send_command/3 :: (pid(), amqp_method_record(), content()) -> 'ok').
-spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok').
-spec(send_command_and_signal_back/4 ::
(pid(), amqp_method(), content(), pid()) -> 'ok').
-spec(send_command_and_notify/5 ::
- (pid(), pid(), pid(), amqp_method(), content()) -> 'ok').
+ (pid(), pid(), pid(), amqp_method_record(), content()) -> 'ok').
-spec(internal_send_command/3 ::
- (socket(), channel_number(), amqp_method()) -> 'ok').
+ (socket(), channel_number(), amqp_method_record()) -> 'ok').
-spec(internal_send_command/5 ::
- (socket(), channel_number(), amqp_method(),
+ (socket(), channel_number(), amqp_method_record(),
content(), non_neg_integer()) -> 'ok').
-endif.