diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2010-06-21 17:42:33 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2010-06-21 17:42:33 +0100 |
commit | ab83c4a5e6245c97899328f2f9738e7d17cd2b8a (patch) | |
tree | 516f7324a08886a2dd4bed11d2c15dd8c462a2aa | |
parent | 4626253f57570a8d8fd6adc569b4794d89c6c800 (diff) | |
parent | 90b3bbf238abd46d73c01c1d184b81679d15dd3a (diff) | |
download | rabbitmq-server-ab83c4a5e6245c97899328f2f9738e7d17cd2b8a.tar.gz |
Merged bug 21848 into amqp_0_9_1
-rw-r--r-- | .hgignore | 1 | ||||
-rw-r--r-- | Makefile | 31 | ||||
-rw-r--r-- | codegen.py | 102 | ||||
-rw-r--r-- | include/rabbit_exchange_type_spec.hrl | 1 | ||||
-rw-r--r-- | include/rabbit_framing_spec.hrl | 60 | ||||
-rwxr-xr-x | packaging/common/rabbitmq-server.ocf | 172 | ||||
-rwxr-xr-x | scripts/rabbitmq-multi | 3 | ||||
-rw-r--r-- | scripts/rabbitmq-multi.bat | 6 | ||||
-rwxr-xr-x | scripts/rabbitmq-server | 3 | ||||
-rw-r--r-- | scripts/rabbitmq-server.bat | 6 | ||||
-rw-r--r-- | scripts/rabbitmq-service.bat | 6 | ||||
-rwxr-xr-x | scripts/rabbitmqctl | 3 | ||||
-rw-r--r-- | scripts/rabbitmqctl.bat | 6 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 7 | ||||
-rw-r--r-- | src/rabbit_binary_generator.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 31 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 60 | ||||
-rw-r--r-- | src/rabbit_exchange_type.erl | 6 | ||||
-rw-r--r-- | src/rabbit_exchange_type_direct.erl | 4 | ||||
-rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 4 | ||||
-rw-r--r-- | src/rabbit_exchange_type_headers.erl | 4 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 4 | ||||
-rw-r--r-- | src/rabbit_framing_channel.erl | 31 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 10 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 10 |
25 files changed, 336 insertions, 237 deletions
@@ -10,6 +10,7 @@ syntax: regexp ^cover/ ^dist/ ^include/rabbit_framing\.hrl$ +^include/rabbit_framing_spec\.hrl$ ^src/rabbit_framing\.erl$ ^src/.*\_usage.erl$ ^rabbit\.plt$ @@ -11,10 +11,10 @@ SOURCE_DIR=src EBIN_DIR=ebin INCLUDE_DIR=include DOCS_DIR=docs -INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl +INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL) BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) -TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS) +TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl $(BEAM_TARGETS) WEB_URL=http://stage.rabbitmq.com/ MANPAGES=$(patsubst %.xml, %.gz, $(wildcard $(DOCS_DIR)/*.[0-9].xml)) WEB_MANPAGES=$(patsubst %.xml, %.man.xml, $(wildcard $(DOCS_DIR)/*.[0-9].xml) $(DOCS_DIR)/rabbitmq-service.xml) @@ -40,7 +40,7 @@ BASIC_PLT=basic.plt RABBIT_PLT=rabbit.plt ifndef USE_SPECS -# our type specs rely on features / bug fixes in dialyzer that are +# our type specs rely on features and bug fixes in dialyzer that are # only available in R13B01 upwards (R13B01 is eshell 5.7.2) # # NB: the test assumes that version number will only contain single digits @@ -70,6 +70,12 @@ define usage_dep $(call usage_xml_to_erl, $(1)): $(1) $(DOCS_DIR)/usage.xsl endef +ifneq "$(SBIN_DIR)" "" +ifneq "$(TARGET_DIR)" "" +SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR)) +endif +endif + all: $(TARGETS) $(DEPS_FILE): $(SOURCES) $(INCLUDES) @@ -84,6 +90,9 @@ $(EBIN_DIR)/%.beam: $(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_FILES) $@ +$(INCLUDE_DIR)/rabbit_framing_spec.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) + $(PYTHON) codegen.py spec $(AMQP_SPEC_JSON_FILES) $@ + $(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES) $@ @@ -110,7 +119,7 @@ $(BASIC_PLT): $(BEAM_TARGETS) clean: rm -f $(EBIN_DIR)/*.beam rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel - rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc + rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(INCLUDE_DIR)/rabbit_framing_spec.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc rm -f $(DOCS_DIR)/*.[0-9].gz $(DOCS_DIR)/*.man.xml $(DOCS_DIR)/*.erl $(USAGES_ERL) rm -f $(RABBIT_PLT) rm -f $(DEPS_FILE) @@ -234,13 +243,7 @@ $(SOURCE_DIR)/%_usage.erl: docs_all: $(MANPAGES) $(WEB_MANPAGES) -install: SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR)) install: all docs_all install_dirs - @[ -n "$(TARGET_DIR)" ] || (echo "Please set TARGET_DIR."; false) - @[ -n "$(SBIN_DIR)" ] || (echo "Please set SBIN_DIR."; false) - @[ -n "$(MAN_DIR)" ] || (echo "Please set MAN_DIR."; false) - - mkdir -p $(TARGET_DIR) cp -r ebin include LICENSE LICENSE-MPL-RabbitMQ INSTALL $(TARGET_DIR) chmod 0755 scripts/* @@ -256,8 +259,14 @@ install: all docs_all install_dirs done install_dirs: - mkdir -p $(SBIN_DIR) + @ OK=true && \ + { [ -n "$(TARGET_DIR)" ] || { echo "Please set TARGET_DIR."; OK=false; }; } && \ + { [ -n "$(SBIN_DIR)" ] || { echo "Please set SBIN_DIR."; OK=false; }; } && \ + { [ -n "$(MAN_DIR)" ] || { echo "Please set MAN_DIR."; OK=false; }; } && $$OK + mkdir -p $(TARGET_DIR)/sbin + mkdir -p $(SBIN_DIR) + mkdir -p $(MAN_DIR) $(foreach XML, $(USAGES_XML), $(eval $(call usage_dep, $(XML)))) @@ -93,6 +93,27 @@ class PackedMethodBitField: def full(self): return self.count() == 8 +def multiLineFormat(things, prologue, separator, lineSeparator, epilogue, thingsPerLine = 4): + r = [prologue] + i = 0 + for t in things: + if i != 0: + if i % thingsPerLine == 0: + r += [lineSeparator] + else: + r += [separator] + r += [t] + i += 1 + r += [epilogue] + return "".join(r) + +def prettyType(typeName, subTypes, typesPerLine = 4): + """Pretty print a type signature made up of many alternative subtypes""" + sTs = multiLineFormat(subTypes, + "( ", " | ", "\n | ", " )", + thingsPerLine = typesPerLine) + return "-type(%s ::\n %s)." % (typeName, sTs) + def printFileHeader(): print """%% Autogenerated code. Do not edit. %% @@ -314,6 +335,22 @@ def genErl(spec): bitvalue(true) -> 1; bitvalue(false) -> 0; bitvalue(undefined) -> 0. + +%% Method signatures +-ifdef(use_specs). +-spec(lookup_method_name/1 :: (amqp_method()) -> amqp_method_name()). +-spec(method_id/1 :: (amqp_method_name()) -> amqp_method()). +-spec(method_has_content/1 :: (amqp_method_name()) -> boolean()). +-spec(is_method_synchronous/1 :: (amqp_method_record()) -> boolean()). +-spec(method_record/1 :: (amqp_method_name()) -> amqp_method_record()). +-spec(method_fieldnames/1 :: (amqp_method_name()) -> [amqp_method_field_name()]). +-spec(decode_method_fields/2 :: (amqp_method_name(), binary()) -> amqp_method_record()). +-spec(decode_properties/2 :: (non_neg_integer(), binary()) -> amqp_property_record()). +-spec(encode_method_fields/1 :: (amqp_method_record()) -> binary()). +-spec(encode_properties/1 :: (amqp_method_record()) -> binary()). +-spec(lookup_amqp_exception/1 :: (amqp_exception()) -> {boolean(), amqp_exception_code(), binary()}). +-spec(amqp_exception/1 :: (amqp_exception_code()) -> amqp_exception()). +-endif. % use_specs """ for m in methods: genLookupMethodName(m) print "lookup_method_name({_ClassId, _MethodId} = Id) -> exit({unknown_method_id, Id})." @@ -389,12 +426,75 @@ def genHrl(spec): for c in spec.allClasses(): print "-record('P_%s', {%s})." % (erlangize(c.name), fieldNameList(c.fields)) + print "-ifdef(use_specs)." + print "%% Various types" + print prettyType("amqp_method_name()", + [m.erlangName() for m in methods]) + print prettyType("amqp_method()", + ["{%s, %s}" % (m.klass.index, m.index) for m in methods], + 6) + print prettyType("amqp_method_record()", + ["#%s{}" % (m.erlangName()) for m in methods]) + fieldNames = set() + for m in methods: + fieldNames.update(m.arguments) + fieldNames = [erlangize(f.name) for f in fieldNames] + print prettyType("amqp_method_field_name()", + fieldNames) + print prettyType("amqp_property_record()", + ["#'P_%s'{}" % erlangize(c.name) for c in spec.allClasses()]) + print prettyType("amqp_exception()", + ["'%s'" % erlangConstantName(c).lower() for (c, v, cls) in spec.constants]) + print prettyType("amqp_exception_code()", + ["%i" % v for (c, v, cls) in spec.constants]) + print "-endif. % use_specs" + +def genSpec(spec): + methods = spec.allMethods() + + printFileHeader() + print """% Hard-coded types +-type(amqp_field_type() :: + 'longstr' | 'signedint' | 'decimal' | 'timestamp' | + 'table' | 'byte' | 'double' | 'float' | 'long' | + 'short' | 'bool' | 'binary' | 'void'). +-type(amqp_property_type() :: + 'shortstr' | 'longstr' | 'octet' | 'shortint' | 'longint' | + 'longlongint' | 'timestamp' | 'bit' | 'table'). +%% we could make this more precise but ultimately are limited by +%% dialyzer's lack of support for recursive types +-type(amqp_table() :: [{binary(), amqp_field_type(), any()}]). +%% TODO: make this more precise +-type(amqp_properties() :: tuple()). + +-type(channel_number() :: non_neg_integer()). +-type(resource_name() :: binary()). +-type(routing_key() :: binary()). +-type(username() :: binary()). +-type(password() :: binary()). +-type(vhost() :: binary()). +-type(ctag() :: binary()). +-type(exchange_type() :: atom()). +-type(binding_key() :: binary()). +""" + print "% Auto-generated types" + classIds = set() + for m in spec.allMethods(): + classIds.add(m.klass.index) + print prettyType("amqp_class_id()", + ["%i" % ci for ci in classIds]) + def generateErl(specPath): genErl(AmqpSpec(specPath)) def generateHrl(specPath): genHrl(AmqpSpec(specPath)) +def generateSpec(specPath): + genSpec(AmqpSpec(specPath)) + if __name__ == "__main__": - do_main(generateHrl, generateErl) + do_main_dict({"header": generateHrl, + "spec": generateSpec, + "body": generateErl}) diff --git a/include/rabbit_exchange_type_spec.hrl b/include/rabbit_exchange_type_spec.hrl index 38057beb..cb564365 100644 --- a/include/rabbit_exchange_type_spec.hrl +++ b/include/rabbit_exchange_type_spec.hrl @@ -38,5 +38,6 @@ -spec(delete/2 :: (exchange(), list(binding())) -> 'ok'). -spec(add_binding/2 :: (exchange(), binding()) -> 'ok'). -spec(remove_bindings/2 :: (exchange(), list(binding())) -> 'ok'). +-spec(assert_args_equivalence/2 :: (exchange(), amqp_table()) -> 'ok'). -endif. diff --git a/include/rabbit_framing_spec.hrl b/include/rabbit_framing_spec.hrl deleted file mode 100644 index 1a979899..00000000 --- a/include/rabbit_framing_spec.hrl +++ /dev/null @@ -1,60 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - -%% TODO: much of this should be generated - --type(amqp_field_type() :: - 'longstr' | 'signedint' | 'decimal' | 'timestamp' | - 'table' | 'byte' | 'double' | 'float' | 'long' | - 'short' | 'bool' | 'binary' | 'void'). --type(amqp_property_type() :: - 'shortstr' | 'longstr' | 'octet' | 'shortint' | 'longint' | - 'longlongint' | 'timestamp' | 'bit' | 'table'). -%% we could make this more precise but ultimately are limited by -%% dialyzer's lack of support for recursive types --type(amqp_table() :: [{binary(), amqp_field_type(), any()}]). -%% TODO: make this more precise --type(amqp_class_id() :: non_neg_integer()). -%% TODO: make this more precise --type(amqp_properties() :: tuple()). -%% TODO: make this more precise --type(amqp_method() :: tuple()). -%% TODO: make this more precise --type(amqp_method_name() :: atom()). --type(channel_number() :: non_neg_integer()). --type(resource_name() :: binary()). --type(routing_key() :: binary()). --type(username() :: binary()). --type(password() :: binary()). --type(vhost() :: binary()). --type(ctag() :: binary()). --type(exchange_type() :: atom()). --type(binding_key() :: binary()). diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf index 97c58ea2..db0ed70b 100755 --- a/packaging/common/rabbitmq-server.ocf +++ b/packaging/common/rabbitmq-server.ocf @@ -35,21 +35,22 @@ ## ## OCF instance parameters -## OCF_RESKEY_multi -## OCF_RESKEY_ctl -## OCF_RESKEY_nodename -## OCF_RESKEY_ip -## OCF_RESKEY_port -## OCF_RESKEY_cluster_config_file -## OCF_RESKEY_config_file -## OCF_RESKEY_log_base -## OCF_RESKEY_mnesia_base -## OCF_RESKEY_server_start_args +## OCF_RESKEY_multi +## OCF_RESKEY_ctl +## OCF_RESKEY_nodename +## OCF_RESKEY_ip +## OCF_RESKEY_port +## OCF_RESKEY_cluster_config_file +## OCF_RESKEY_config_file +## OCF_RESKEY_log_base +## OCF_RESKEY_mnesia_base +## OCF_RESKEY_server_start_args ####################################################################### # Initialization: -. ${OCF_ROOT}/resource.d/heartbeat/.ocf-shellfuncs +: ${OCF_FUNCTIONS_DIR=${OCF_ROOT}/resource.d/heartbeat} +. ${OCF_FUNCTIONS_DIR}/.ocf-shellfuncs ####################################################################### @@ -63,7 +64,7 @@ OCF_RESKEY_log_base_default="/var/log/rabbitmq" : ${OCF_RESKEY_log_base=${OCF_RESKEY_log_base_default}} meta_data() { - cat <<END + cat <<END <?xml version="1.0"?> <!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd"> <resource-agent name="rabbitmq-server"> @@ -113,7 +114,7 @@ The IP address for rabbitmq-server to listen on The IP Port for rabbitmq-server to listen on </longdesc> <shortdesc lang="en">IP Port</shortdesc> -<content type="string" default="" /> +<content type="integer" default="" /> </parameter> <parameter name="cluster_config_file" unique="0" required="0"> @@ -161,7 +162,8 @@ Additional arguments provided to the server on startup <actions> <action name="start" timeout="600" /> <action name="stop" timeout="120" /> -<action name="monitor" timeout="20" interval="10" depth="0" start-delay="0" /> +<action name="status" timeout="20" interval="10" /> +<action name="monitor" timeout="20" interval="10" /> <action name="validate-all" timeout="30" /> <action name="meta-data" timeout="5" /> </actions> @@ -170,8 +172,8 @@ END } rabbit_usage() { - cat <<END -usage: $0 {start|stop|monitor|validate-all|meta-data} + cat <<END +usage: $0 {start|stop|status|monitor|validate-all|meta-data} Expects to have a fully populated OCF RA-compliant environment set. END @@ -202,35 +204,35 @@ export_vars() { rabbit_validate_partial() { if [ ! -x $RABBITMQ_MULTI ]; then - ocf_log err "rabbitmq-server multi $RABBITMQ_MULTI does not exist or is not executable"; - return $OCF_ERR_ARGS; + ocf_log err "rabbitmq-server multi $RABBITMQ_MULTI does not exist or is not executable"; + exit $OCF_ERR_INSTALLED; fi if [ ! -x $RABBITMQ_CTL ]; then - ocf_log err "rabbitmq-server ctl $RABBITMQ_CTL does not exist or is not executable"; - return $OCF_ERR_ARGS; + ocf_log err "rabbitmq-server ctl $RABBITMQ_CTL does not exist or is not executable"; + exit $OCF_ERR_INSTALLED; fi } rabbit_validate_full() { if [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && [ ! -e $RABBITMQ_CLUSTER_CONFIG_FILE ]; then - ocf_log err "rabbitmq-server cluster_config_file $RABBITMQ_CLUSTER_CONFIG_FILE does not exist or is not a file"; - return $OCF_ERR_ARGS; + ocf_log err "rabbitmq-server cluster_config_file $RABBITMQ_CLUSTER_CONFIG_FILE does not exist or is not a file"; + exit $OCF_ERR_INSTALLED; fi if [ ! -z $RABBITMQ_CONFIG_FILE ] && [ ! -e $RABBITMQ_CONFIG_FILE ]; then - ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file"; - return $OCF_ERR_ARGS; + ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file"; + exit $OCF_ERR_INSTALLED; fi if [ ! -z $RABBITMQ_LOG_BASE ] && [ ! -d $RABBITMQ_LOG_BASE ]; then - ocf_log err "rabbitmq-server log_base $RABBITMQ_LOG_BASE does not exist or is not a directory"; - return $OCF_ERR_ARGS; + ocf_log err "rabbitmq-server log_base $RABBITMQ_LOG_BASE does not exist or is not a directory"; + exit $OCF_ERR_INSTALLED; fi if [ ! -z $RABBITMQ_MNESIA_BASE ] && [ ! -d $RABBITMQ_MNESIA_BASE ]; then - ocf_log err "rabbitmq-server mnesia_base $RABBITMQ_MNESIA_BASE does not exist or is not a directory"; - return $OCF_ERR_ARGS; + ocf_log err "rabbitmq-server mnesia_base $RABBITMQ_MNESIA_BASE does not exist or is not a directory"; + exit $OCF_ERR_INSTALLED; fi rabbit_validate_partial @@ -243,25 +245,26 @@ rabbit_status() { $RABBITMQ_CTL $NODENAME_ARG status > /dev/null 2> /dev/null rc=$? case "$rc" in - 0) - return $OCF_SUCCESS - ;; - 2) - return $OCF_NOT_RUNNING - ;; - *) - ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG status: $rc" - return $OCF_ERR_GENERIC + 0) + ocf_log debug "RabbitMQ server is running normally" + return $OCF_SUCCESS + ;; + 2) + ocf_log debug "RabbitMQ server is not running" + return $OCF_NOT_RUNNING + ;; + *) + ocf_log err "Unexpected return from rabbitmqctl $NODENAME_ARG status: $rc" + exit $OCF_ERR_GENERIC esac } rabbit_start() { local rc - rabbit_validate_full - rc=$? - if [ "$rc" != $OCF_SUCCESS ]; then - return $rc + if rabbit_status; then + ocf_log info "Resource already running." + return $OCF_SUCCESS fi export_vars @@ -270,24 +273,23 @@ rabbit_start() { rc=$? if [ "$rc" != 0 ]; then - ocf_log err "rabbitmq-server start command failed: $RABBITMQ_MULTI start_all 1, $rc" - return $rc + ocf_log err "rabbitmq-server start command failed: $RABBITMQ_MULTI start_all 1, $rc" + return $rc fi # Spin waiting for the server to come up. # Let the CRM/LRM time us out if required start_wait=1 while [ $start_wait = 1 ]; do - rabbit_status - rc=$? - if [ "$rc" = $OCF_SUCCESS ]; then - start_wait=0 - - elif [ "$rc" != $OCF_NOT_RUNNING ]; then - ocf_log info "rabbitmq-server start failed: $rc" - return $OCF_ERR_GENERIC - fi - sleep 2 + rabbit_status + rc=$? + if [ "$rc" = $OCF_SUCCESS ]; then + start_wait=0 + elif [ "$rc" != $OCF_NOT_RUNNING ]; then + ocf_log info "rabbitmq-server start failed: $rc" + exit $OCF_ERR_GENERIC + fi + sleep 1 done return $OCF_SUCCESS @@ -295,28 +297,34 @@ rabbit_start() { rabbit_stop() { local rc + + if ! rabbit_status; then + ocf_log info "Resource not running." + return $OCF_SUCCESS + fi + $RABBITMQ_MULTI stop_all & rc=$? if [ "$rc" != 0 ]; then - ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_MULTI stop_all, $rc" - return $rc + ocf_log err "rabbitmq-server stop command failed: $RABBITMQ_MULTI stop_all, $rc" + return $rc fi # Spin waiting for the server to shut down. # Let the CRM/LRM time us out if required stop_wait=1 while [ $stop_wait = 1 ]; do - rabbit_status - rc=$? - if [ "$rc" = $OCF_NOT_RUNNING ]; then - stop_wait=0 + rabbit_status + rc=$? + if [ "$rc" = $OCF_NOT_RUNNING ]; then + stop_wait=0 break - elif [ "$rc" != $OCF_SUCCESS ]; then - ocf_log info "rabbitmq-server stop failed: $rc" - return $OCF_ERR_GENERIC - fi - sleep 2 + elif [ "$rc" != $OCF_SUCCESS ]; then + ocf_log info "rabbitmq-server stop failed: $rc" + exit $OCF_ERR_GENERIC + fi + sleep 1 done return $OCF_SUCCESS @@ -329,34 +337,38 @@ rabbit_monitor() { case $__OCF_ACTION in meta-data) - meta_data - exit $OCF_SUCCESS - ;; + meta_data + exit $OCF_SUCCESS + ;; usage|help) - rabbit_usage - exit $OCF_SUCCESS - ;; + rabbit_usage + exit $OCF_SUCCESS + ;; esac -rabbit_validate_partial || exit +if ocf_is_probe; then + rabbit_validate_partial +else + rabbit_validate_full +fi case $__OCF_ACTION in start) - rabbit_start + rabbit_start ;; stop) - rabbit_stop + rabbit_stop ;; - monitor) - rabbit_monitor + status|monitor) + rabbit_monitor ;; validate-all) exit $OCF_SUCCESS - ;; + ;; *) - rabbit_usage - exit $OCF_ERR_UNIMPLEMENTED - ;; + rabbit_usage + exit $OCF_ERR_UNIMPLEMENTED + ;; esac -exit $?
\ No newline at end of file +exit $? diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 8341d35c..59050692 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -29,7 +29,8 @@ ## ## Contributor(s): ______________________________________. ## -NODENAME=rabbit +[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s` +NODENAME=rabbit@${HOSTNAME%%.*} SCRIPT_HOME=$(dirname $0) PIDS_FILE=/var/lib/rabbitmq/pids MULTI_ERL_ARGS= diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat index a4b7f2e9..a4f8c8b4 100644 --- a/scripts/rabbitmq-multi.bat +++ b/scripts/rabbitmq-multi.bat @@ -42,8 +42,12 @@ if "!RABBITMQ_BASE!"=="" ( set RABBITMQ_BASE=!APPDATA!\RabbitMQ
)
+if "!COMPUTERNAME!"=="" (
+ set COMPUTERNAME=localhost
+)
+
if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index ccdfc401..2261b56e 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -30,7 +30,8 @@ ## Contributor(s): ______________________________________. ## -NODENAME=rabbit +[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s` +NODENAME=rabbit@${HOSTNAME%%.*} SERVER_ERL_ARGS="+K true +A30 +P 1048576 \ -kernel inet_default_listen_options [{nodelay,true}] \ -kernel inet_default_connect_options [{nodelay,true}]" diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 57fe1328..a290f935 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -42,8 +42,12 @@ if "!RABBITMQ_BASE!"=="" ( set RABBITMQ_BASE=!APPDATA!\RabbitMQ
)
+if "!COMPUTERNAME!"=="" (
+ set COMPUTERNAME=localhost
+)
+
if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index a4021fd6..bd117b83 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -48,8 +48,12 @@ if "!RABBITMQ_BASE!"=="" ( set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
+if "!COMPUTERNAME!"=="" (
+ set COMPUTERNAME=localhost
+)
+
if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
if "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index cfb775eb..92e5312b 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -30,7 +30,8 @@ ## Contributor(s): ______________________________________. ## -NODENAME=rabbit +[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s` +NODENAME=rabbit@${HOSTNAME%%.*} . `dirname $0`/rabbitmq-env diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index 55572451..563b9e58 100644 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -38,8 +38,12 @@ set TDP0=%~dp0 set STAR=%*
setlocal enabledelayedexpansion
+if "!COMPUTERNAME!"=="" (
+ set COMPUTERNAME=localhost
+)
+
if "!RABBITMQ_NODENAME!"=="" (
- set RABBITMQ_NODENAME=rabbit
+ set RABBITMQ_NODENAME=rabbit@!COMPUTERNAME!
)
if not exist "!ERLANG_HOME!\bin\erl.exe" (
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e7c92664..5fdf0ffa 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -716,16 +716,17 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, {Count, BQS1} = BQ:purge(BQS), reply({ok, Count}, State#q{backing_queue_state = BQS1}); -handle_call({requeue, AckTags, ChPid}, _From, State) -> +handle_call({requeue, AckTags, ChPid}, From, State) -> + gen_server2:reply(From, ok), case lookup_ch(ChPid) of not_found -> rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", [ChPid]), - reply(ok, State); + noreply(State); C = #cr{acktags = ChAckTags} -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), store_ch_record(C#cr{acktags = ChAckTags1}), - reply(ok, requeue_and_run(AckTags, State)) + noreply(requeue_and_run(AckTags, State)) end; handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 27a1275a..81cf3cee 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -57,7 +57,7 @@ -type(frame() :: [binary()]). -spec(build_simple_method_frame/2 :: - (channel_number(), amqp_method()) -> frame()). + (channel_number(), amqp_method_record()) -> frame()). -spec(build_simple_content_frames/3 :: (channel_number(), content(), non_neg_integer()) -> [frame()]). -spec(build_heartbeat_frame/0 :: () -> frame()). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 58c56cc0..3dfc026b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -75,8 +75,8 @@ -spec(start_link/6 :: (channel_number(), pid(), pid(), username(), vhost(), pid()) -> pid()). --spec(do/2 :: (pid(), amqp_method()) -> 'ok'). --spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). +-spec(do/2 :: (pid(), amqp_method_record()) -> 'ok'). +-spec(do/3 :: (pid(), amqp_method_record(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(deliver/4 :: (pid(), ctag(), boolean(), qmsg()) -> 'ok'). @@ -344,7 +344,7 @@ with_exclusive_access_or_die(QName, ReaderPid, F) -> expand_queue_name_shortcut(<<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( - not_allowed, "no previously declared queue", []); + not_found, "no previously declared queue", []); expand_queue_name_shortcut(<<>>, #ch{ virtual_host = VHostPath, most_recently_declared_queue = MRDQ }) -> rabbit_misc:r(VHostPath, queue, MRDQ); @@ -354,7 +354,7 @@ expand_queue_name_shortcut(QueueNameBin, #ch{ virtual_host = VHostPath }) -> expand_routing_key_shortcut(<<>>, <<>>, #ch{ most_recently_declared_queue = <<>> }) -> rabbit_misc:protocol_error( - not_allowed, "no previously declared queue", []); + not_found, "no previously declared queue", []); expand_routing_key_shortcut(<<>>, <<>>, #ch{ most_recently_declared_queue = MRDQ }) -> MRDQ; @@ -460,13 +460,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State = #ch{transaction_id = TxnKey, - next_tag = NextDeliveryTag, unacked_message_q = UAMQ}) -> - if DeliveryTag >= NextDeliveryTag -> - rabbit_misc:protocol_error( - command_invalid, "unknown delivery tag ~w", [DeliveryTag]); - true -> ok - end, {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), Participants = ack(TxnKey, Acked), {noreply, case TxnKey of @@ -528,9 +522,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin, Other -> Other end, - %% In order to ensure that the consume_ok gets sent before - %% any messages are sent to the consumer, we get the queue - %% process to send the consume_ok on our behalf. + %% We get the queue process to send the consume_ok on our + %% behalf. This is for symmetry with basic.cancel - see + %% the comment in that method for why. case with_exclusive_access_or_die( QueueName, ReaderPid, fun (Q) -> @@ -843,14 +837,14 @@ handle_method(#'tx.select'{}, _, State) -> handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) -> rabbit_misc:protocol_error( - not_allowed, "channel is not transactional", []); + precondition_failed, "channel is not transactional", []); handle_method(#'tx.commit'{}, _, State) -> {reply, #'tx.commit_ok'{}, internal_commit(State)}; handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> rabbit_misc:protocol_error( - not_allowed, "channel is not transactional", []); + precondition_failed, "channel is not transactional", []); handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; @@ -951,10 +945,6 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, not_found, "no binding ~s between ~s and ~s", [RoutingKey, rabbit_misc:rs(ExchangeName), rabbit_misc:rs(QueueName)]); - {error, durability_settings_incompatible} -> - rabbit_misc:protocol_error( - not_allowed, "durability settings of ~s incompatible with ~s", - [rabbit_misc:rs(QueueName), rabbit_misc:rs(ExchangeName)]); ok -> return_ok(State, NoWait, ReturnMethod) end. @@ -989,7 +979,8 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> QTail, DeliveryTag, Multiple) end; {empty, _} -> - {ToAcc, PrefixAcc} + rabbit_misc:protocol_error( + not_found, "unknown delivery tag ~w", [DeliveryTag]) end. add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 841161db..eb6f3e49 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -40,7 +40,8 @@ -export([delete/2]). -export([delete_queue_bindings/1, delete_transient_queue_bindings/1]). -export([assert_equivalence/4]). --export([check_type/1, assert_type/2]). +-export([assert_args_equivalence/2]). +-export([check_type/1]). %% EXTENDED API -export([list_exchange_bindings/1]). @@ -65,6 +66,7 @@ -spec(declare/4 :: (exchange_name(), exchange_type(), boolean(), amqp_table()) -> exchange()). -spec(check_type/1 :: (binary()) -> atom()). -spec(assert_equivalence/4 :: (exchange(), atom(), boolean(), amqp_table()) -> 'ok'). +-spec(assert_args_equivalence/2 :: (exchange(), amqp_table()) -> 'ok'). -spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()). -spec(lookup_or_die/1 :: (exchange_name()) -> exchange()). -spec(list/1 :: (vhost()) -> [exchange()]). @@ -76,7 +78,7 @@ -spec(publish/2 :: (exchange(), delivery()) -> {routing_result(), [pid()]}). -spec(add_binding/5 :: (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) -> - bind_res() | {'error', 'durability_settings_incompatible'}). + bind_res()). -spec(delete_binding/5 :: (exchange_name(), queue_name(), routing_key(), amqp_table(), inner_fun()) -> bind_res() | {'error', 'binding_not_found'}). @@ -184,28 +186,22 @@ check_type(TypeBin) -> T end. -assert_equivalence(X = #exchange{ durable = ActualDurable }, - RequiredType, RequiredDurable, RequiredArgs) - when ActualDurable == RequiredDurable -> - ok = assert_type(X, RequiredType), - ok = assert_args_equivalence(X, RequiredArgs); -assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _Args) -> +assert_equivalence(X = #exchange{ durable = Durable, + type = Type}, + Type, Durable, + RequiredArgs) -> + ok = (type_to_module(Type)):assert_args_equivalence(X, RequiredArgs); +assert_equivalence(#exchange{ name = Name }, _Type, _Durable, + _Args) -> rabbit_misc:protocol_error( - not_allowed, "cannot redeclare ~s with different durable value", + precondition_failed, + "cannot redeclare ~s with different type, durable or autodelete value", [rabbit_misc:rs(Name)]). -assert_type(#exchange{ type = ActualType }, RequiredType) - when ActualType == RequiredType -> - ok; -assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) -> - rabbit_misc:protocol_error( - not_allowed, "cannot redeclare ~s of type '~s' with type '~s'", - [rabbit_misc:rs(Name), ActualType, RequiredType]). - alternate_exchange_value(Args) -> lists:keysearch(<<"alternate-exchange">>, 1, Args). -assert_args_equivalence(#exchange{ name = Name, +assert_args_equivalence(#exchange{ name = Name, arguments = Args }, RequiredArgs) -> %% The spec says "Arguments are compared for semantic @@ -213,9 +209,9 @@ assert_args_equivalence(#exchange{ name = Name, %% "alternate-exchange". Ae1 = alternate_exchange_value(RequiredArgs), Ae2 = alternate_exchange_value(Args), - if Ae1==Ae2 -> ok; - true -> rabbit_misc:protocol_error( - not_allowed, + if Ae1==Ae2 -> ok; + true -> rabbit_misc:protocol_error( + precondition_failed, "cannot redeclare ~s with inequivalent args", [rabbit_misc:rs(Name)]) end. @@ -400,19 +396,17 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments, InnerFun) -> fun (X, Q, B) -> %% this argument is used to check queue exclusivity; %% in general, we want to fail on that in preference to - %% failing on e.g., the durability being different. + %% anything else InnerFun(X, Q), - if Q#amqqueue.durable and not(X#exchange.durable) -> - {error, durability_settings_incompatible}; - true -> - case mnesia:read({rabbit_route, B}) of - [] -> - sync_binding(B, Q#amqqueue.durable, - fun mnesia:write/3), - {new, X, B}; - [_R] -> - {existing, X, B} - end + case mnesia:read({rabbit_route, B}) of + [] -> + sync_binding(B, + X#exchange.durable andalso + Q#amqqueue.durable, + fun mnesia:write/3), + {new, X, B}; + [_R] -> + {existing, X, B} end end) of {new, Exchange = #exchange{ type = Type }, Binding} -> diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 699250f7..85760edc 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -54,7 +54,11 @@ behaviour_info(callbacks) -> {add_binding, 2}, %% called after bindings have been deleted. - {remove_bindings, 2} + {remove_bindings, 2}, + + %% called when comparing exchanges for equivalence - should return ok or + %% exit with #amqp_error{} + {assert_args_equivalence, 2} ]; behaviour_info(_Other) -> diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index c3fb2588..4f6eb851 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -36,7 +36,7 @@ -export([description/0, publish/2]). -export([validate/1, create/1, recover/2, delete/2, - add_binding/2, remove_bindings/2]). + add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -61,3 +61,5 @@ recover(_X, _Bs) -> ok. delete(_X, _Bs) -> ok. add_binding(_X, _B) -> ok. remove_bindings(_X, _Bs) -> ok. +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 62c862a5..4f9712b1 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -36,7 +36,7 @@ -export([description/0, publish/2]). -export([validate/1, create/1, recover/2, delete/2, - add_binding/2, remove_bindings/2]). + add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -59,3 +59,5 @@ recover(_X, _Bs) -> ok. delete(_X, _Bs) -> ok. add_binding(_X, _B) -> ok. remove_bindings(_X, _Bs) -> ok. +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 0991bf0d..315e8000 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -37,7 +37,7 @@ -export([description/0, publish/2]). -export([validate/1, create/1, recover/2, delete/2, - add_binding/2, remove_bindings/2]). + add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -135,3 +135,5 @@ recover(_X, _Bs) -> ok. delete(_X, _Bs) -> ok. add_binding(_X, _B) -> ok. remove_bindings(_X, _Bs) -> ok. +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index e42c4518..0e22d545 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -36,7 +36,7 @@ -export([description/0, publish/2]). -export([validate/1, create/1, recover/2, delete/2, - add_binding/2, remove_bindings/2]). + add_binding/2, remove_bindings/2, assert_args_equivalence/2]). -include("rabbit_exchange_type_spec.hrl"). -rabbit_boot_step({?MODULE, @@ -99,3 +99,5 @@ recover(_X, _Bs) -> ok. delete(_X, _Bs) -> ok. add_binding(_X, _B) -> ok. remove_bindings(_X, _Bs) -> ok. +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index b7c6aa96..161dfd84 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -73,14 +73,23 @@ read_frame(ChannelPid) -> end. mainloop(ChannelPid) -> - {method, MethodName, FieldsBin} = read_frame(ChannelPid), - Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin), - case rabbit_framing:method_has_content(MethodName) of - true -> rabbit_channel:do(ChannelPid, Method, - collect_content(ChannelPid, MethodName)); - false -> rabbit_channel:do(ChannelPid, Method) - end, - ?MODULE:mainloop(ChannelPid). + Decoded = read_frame(ChannelPid), + case Decoded of + {method, MethodName, FieldsBin} -> + Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin), + case rabbit_framing:method_has_content(MethodName) of + true -> rabbit_channel:do(ChannelPid, Method, + collect_content(ChannelPid, + MethodName)); + false -> rabbit_channel:do(ChannelPid, Method) + end, + ?MODULE:mainloop(ChannelPid); + _ -> + rabbit_misc:protocol_error( + unexpected_frame, + "expected method frame, got ~p instead", + [Decoded]) + end. collect_content(ChannelPid, MethodName) -> {ClassId, _MethodId} = rabbit_framing:method_id(MethodName), @@ -94,14 +103,14 @@ collect_content(ChannelPid, MethodName) -> payload_fragments_rev = Payload}; true -> rabbit_misc:protocol_error( - command_invalid, + unexpected_frame, "expected content header for class ~w, " "got one for class ~w instead", [ClassId, HeaderClassId]) end; _ -> rabbit_misc:protocol_error( - command_invalid, + unexpected_frame, "expected content header for class ~w, " "got non content header frame instead", [ClassId]) @@ -117,7 +126,7 @@ collect_content_payload(ChannelPid, RemainingByteCount, Acc) -> [FragmentBin | Acc]); _ -> rabbit_misc:protocol_error( - command_invalid, + unexpected_frame, "expected content body, got non content body frame instead", []) end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index bb880dad..3e03ae0c 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -105,6 +105,8 @@ %% heartbeat timeout -> *throw* %% closing: %% socket close -> *terminate* +%% receive connection.close -> send connection.close_ok, +%% *closing* %% receive frame -> ignore, *closing* %% handshake_timeout -> ignore, *closing* %% heartbeat timeout -> *throw* @@ -121,6 +123,8 @@ %% start terminate_connection timer, *closed* %% closed: %% socket close -> *terminate* +%% receive connection.close -> send connection.close_ok, +%% *closed* %% receive connection.close_ok -> self() ! terminate_connection, %% *closed* %% receive frame -> ignore, *closed* @@ -659,6 +663,12 @@ handle_method0(#'connection.close'{}, State = #v1{connection_state = running}) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); +handle_method0(#'connection.close'{}, State = #v1{connection_state = CS}) + when CS =:= closing; CS =:= closed -> + %% We're already closed or closing, so we don't need to cleanup + %% anything. + ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}), + State; handle_method0(#'connection.close_ok'{}, State = #v1{connection_state = closed}) -> self() ! terminate_connection, diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 54c60f5b..3d10dc12 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -50,17 +50,17 @@ -spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). -spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). --spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). --spec(send_command/3 :: (pid(), amqp_method(), content()) -> 'ok'). +-spec(send_command/2 :: (pid(), amqp_method_record()) -> 'ok'). +-spec(send_command/3 :: (pid(), amqp_method_record(), content()) -> 'ok'). -spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok'). -spec(send_command_and_signal_back/4 :: (pid(), amqp_method(), content(), pid()) -> 'ok'). -spec(send_command_and_notify/5 :: - (pid(), pid(), pid(), amqp_method(), content()) -> 'ok'). + (pid(), pid(), pid(), amqp_method_record(), content()) -> 'ok'). -spec(internal_send_command/3 :: - (socket(), channel_number(), amqp_method()) -> 'ok'). + (socket(), channel_number(), amqp_method_record()) -> 'ok'). -spec(internal_send_command/5 :: - (socket(), channel_number(), amqp_method(), + (socket(), channel_number(), amqp_method_record(), content(), non_neg_integer()) -> 'ok'). -endif. |