diff options
author | Michael Bridgen <mikeb@lshift.net> | 2010-08-01 23:00:44 +0100 |
---|---|---|
committer | Michael Bridgen <mikeb@lshift.net> | 2010-08-01 23:00:44 +0100 |
commit | b5a6465dd30da72a30d0027d3da79afbbcbab4f9 (patch) | |
tree | a5615135bcf2dab8f04f2eac4b2e14f4da27ad3b | |
parent | 357b7dad9e68c8f8799f0bf23501368cfaef94d7 (diff) | |
parent | df11cdaf73e3d984a40d719ad170cabc6f24c5ba (diff) | |
download | rabbitmq-server-bug14647.tar.gz |
Merge default to get json filename changesbug14647
-rw-r--r-- | .hgignore | 2 | ||||
-rw-r--r-- | Makefile | 18 | ||||
-rw-r--r-- | codegen.py | 20 | ||||
-rw-r--r-- | docs/rabbitmqctl.1.xml | 4 | ||||
-rw-r--r-- | include/rabbit.hrl | 5 | ||||
-rwxr-xr-x | packaging/common/rabbitmq-server.ocf | 16 | ||||
-rwxr-xr-x | scripts/rabbitmq-server | 7 | ||||
-rw-r--r-- | scripts/rabbitmq-server.bat | 8 | ||||
-rw-r--r-- | scripts/rabbitmq-service.bat | 8 | ||||
-rw-r--r-- | src/rabbit.erl | 4 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 12 | ||||
-rw-r--r-- | src/rabbit_binary_generator.erl | 61 | ||||
-rw-r--r-- | src/rabbit_binary_parser.erl | 13 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 15 | ||||
-rw-r--r-- | src/rabbit_exchange.erl | 1 | ||||
-rw-r--r-- | src/rabbit_framing_channel.erl | 79 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 5 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 15 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 244 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 22 | ||||
-rw-r--r-- | src/rabbit_types.erl | 6 | ||||
-rw-r--r-- | src/rabbit_writer.erl | 100 | ||||
-rw-r--r-- | src/supervisor2.erl | 109 | ||||
-rw-r--r-- | src/test_sup.erl | 94 |
24 files changed, 510 insertions, 358 deletions
@@ -11,7 +11,7 @@ syntax: regexp ^dist/ ^include/rabbit_framing\.hrl$ ^include/rabbit_framing_spec\.hrl$ -^src/rabbit_framing\.erl$ +^src/rabbit_framing_amqp.*\.erl$ ^src/.*\_usage.erl$ ^rabbit\.plt$ ^basic.plt$ @@ -12,7 +12,7 @@ EBIN_DIR=ebin INCLUDE_DIR=include DOCS_DIR=docs INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl) $(INCLUDE_DIR)/rabbit_framing.hrl -SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing.erl $(USAGES_ERL) +SOURCES=$(wildcard $(SOURCE_DIR)/*.erl) $(SOURCE_DIR)/rabbit_framing_amqp_0_9_1.erl $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl $(USAGES_ERL) BEAM_TARGETS=$(patsubst $(SOURCE_DIR)/%.erl, $(EBIN_DIR)/%.beam, $(SOURCES)) TARGETS=$(EBIN_DIR)/rabbit.app $(INCLUDE_DIR)/rabbit_framing.hrl $(BEAM_TARGETS) WEB_URL=http://www.rabbitmq.com/ @@ -56,7 +56,8 @@ TARGET_SRC_DIR=dist/$(TARBALL_NAME) SIBLING_CODEGEN_DIR=../rabbitmq-codegen/ AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen) -AMQP_SPEC_JSON_FILES=$(AMQP_CODEGEN_DIR)/amqp-0.8.json $(AMQP_CODEGEN_DIR)/rabbitmq-0.8-extensions.json +AMQP_SPEC_JSON_FILES_0_9_1=$(AMQP_CODEGEN_DIR)/amqp-rabbitmq-0.9.1.json +AMQP_SPEC_JSON_FILES_0_8=$(AMQP_CODEGEN_DIR)/amqp-rabbitmq-0.8.json ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e @@ -99,11 +100,14 @@ $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app $(EBIN_DIR)/%.beam: erlc $(ERLC_OPTS) -pa $(EBIN_DIR) $< -$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) - $(PYTHON) codegen.py header $(AMQP_SPEC_JSON_FILES) $@ +$(INCLUDE_DIR)/rabbit_framing.hrl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8) + $(PYTHON) codegen.py --ignore-conflicts header $(AMQP_SPEC_JSON_FILES_0_9_1) $(AMQP_SPEC_JSON_FILES_0_8) $@ -$(SOURCE_DIR)/rabbit_framing.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES) - $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES) $@ +$(SOURCE_DIR)/rabbit_framing_amqp_0_9_1.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_9_1) + $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES_0_9_1) $@ + +$(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_codegen.py $(AMQP_SPEC_JSON_FILES_0_8) + $(PYTHON) codegen.py body $(AMQP_SPEC_JSON_FILES_0_8) $@ dialyze: $(BEAM_TARGETS) $(BASIC_PLT) $(ERL_EBIN) -eval \ @@ -128,7 +132,7 @@ $(BASIC_PLT): $(BEAM_TARGETS) clean: rm -f $(EBIN_DIR)/*.beam rm -f $(EBIN_DIR)/rabbit.app $(EBIN_DIR)/rabbit.boot $(EBIN_DIR)/rabbit.script $(EBIN_DIR)/rabbit.rel - rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing.erl codegen.pyc + rm -f $(INCLUDE_DIR)/rabbit_framing.hrl $(SOURCE_DIR)/rabbit_framing_amqp_*.erl codegen.pyc rm -f $(DOCS_DIR)/*.[0-9].gz $(DOCS_DIR)/*.man.xml $(DOCS_DIR)/*.erl $(USAGES_ERL) rm -f $(RABBIT_PLT) rm -f $(DEPS_FILE) @@ -315,11 +315,16 @@ def genErl(spec): methods = spec.allMethods() printFileHeader() - print """-module(rabbit_framing). --include("rabbit_framing.hrl"). - + module = "rabbit_framing_amqp_%d_%d" % (spec.major, spec.minor) + if spec.revision != 0: + module = "%s_%d" % (module, spec.revision) + if module == "rabbit_framing_amqp_8_0": + module = "rabbit_framing_amqp_0_8" + print "-module(%s)." % module + print """-include("rabbit_framing.hrl"). + +-export([version/0]). -export([lookup_method_name/1]). - -export([method_id/1]). -export([method_has_content/1]). -export([is_method_synchronous/1]). @@ -395,6 +400,7 @@ def genErl(spec): print """ %% Method signatures -ifdef(use_specs). +-spec(version/0 :: () -> {non_neg_integer(), non_neg_integer(), non_neg_integer()}). -spec(lookup_method_name/1 :: (amqp_method()) -> amqp_method_name()). -spec(method_id/1 :: (amqp_method_name()) -> amqp_method()). -spec(method_has_content/1 :: (amqp_method_name()) -> boolean()). @@ -413,6 +419,10 @@ bitvalue(true) -> 1; bitvalue(false) -> 0; bitvalue(undefined) -> 0. """ + version = "{%d, %d, %d}" % (spec.major, spec.minor, spec.revision) + if version == '{8, 0, 0}': version = '{0, 8, 0}' + print "version() -> %s." % (version) + for m in methods: genLookupMethodName(m) print "lookup_method_name({_ClassId, _MethodId} = Id) -> exit({unknown_method_id, Id})." @@ -471,8 +481,6 @@ def genHrl(spec): methods = spec.allMethods() printFileHeader() - print "-define(PROTOCOL_VERSION_MAJOR, %d)." % (spec.major) - print "-define(PROTOCOL_VERSION_MINOR, %d)." % (spec.minor) print "-define(PROTOCOL_PORT, %d)." % (spec.port) for (c,v,cls) in spec.constants: diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index e53a97c2..a7d064f1 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -888,6 +888,10 @@ <listitem><para>Number of channels using the connection.</para></listitem> </varlistentry> <varlistentry> + <term>protocol</term> + <listitem><para>Version of the AMQP protocol in use (currently one of <command>{0,9,1}</command> or <command>{0,8,0}</command>). Note that if a client requests an AMQP 0-9 connection, we treat it as AMQP 0-9-1.</para></listitem> + </varlistentry> + <varlistentry> <term>user</term> <listitem><para>Username associated with the connection.</para></listitem> </varlistentry> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 3fd52568..6364d60f 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -36,7 +36,8 @@ -record(vhost, {virtual_host, dummy}). --record(connection, {user, timeout_sec, frame_max, vhost, client_properties}). +-record(connection, {protocol, user, timeout_sec, frame_max, vhost, + client_properties}). -record(content, {class_id, @@ -44,6 +45,7 @@ properties_bin, %% either 'none', or an encoded properties binary %% Note: at most one of properties and properties_bin can be %% 'none' at once. + protocol, %% The protocol under which properties_bin was encoded payload_fragments_rev %% list of binaries, in reverse order (!) }). @@ -74,6 +76,7 @@ -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). -define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/"). +-define(PROTOCOL_VERSION, "AMQP 0-9-1 / 0-9 / 0-8"). -define(ERTS_MINIMUM, "5.6.3"). -define(MAX_WAIT, 16#ffffffff). diff --git a/packaging/common/rabbitmq-server.ocf b/packaging/common/rabbitmq-server.ocf index db0ed70b..b969535a 100755 --- a/packaging/common/rabbitmq-server.ocf +++ b/packaging/common/rabbitmq-server.ocf @@ -40,7 +40,6 @@ ## OCF_RESKEY_nodename ## OCF_RESKEY_ip ## OCF_RESKEY_port -## OCF_RESKEY_cluster_config_file ## OCF_RESKEY_config_file ## OCF_RESKEY_log_base ## OCF_RESKEY_mnesia_base @@ -117,14 +116,6 @@ The IP Port for rabbitmq-server to listen on <content type="integer" default="" /> </parameter> -<parameter name="cluster_config_file" unique="0" required="0"> -<longdesc lang="en"> -Location of the cluster config file -</longdesc> -<shortdesc lang="en">Cluster config file path</shortdesc> -<content type="string" default="" /> -</parameter> - <parameter name="config_file" unique="0" required="0"> <longdesc lang="en"> Location of the config file @@ -184,7 +175,6 @@ RABBITMQ_CTL=$OCF_RESKEY_ctl RABBITMQ_NODENAME=$OCF_RESKEY_nodename RABBITMQ_NODE_IP_ADDRESS=$OCF_RESKEY_ip RABBITMQ_NODE_PORT=$OCF_RESKEY_port -RABBITMQ_CLUSTER_CONFIG_FILE=$OCF_RESKEY_cluster_config_file RABBITMQ_CONFIG_FILE=$OCF_RESKEY_config_file RABBITMQ_LOG_BASE=$OCF_RESKEY_log_base RABBITMQ_MNESIA_BASE=$OCF_RESKEY_mnesia_base @@ -195,7 +185,6 @@ RABBITMQ_SERVER_START_ARGS=$OCF_RESKEY_server_start_args export_vars() { [ ! -z $RABBITMQ_NODE_IP_ADDRESS ] && export RABBITMQ_NODE_IP_ADDRESS [ ! -z $RABBITMQ_NODE_PORT ] && export RABBITMQ_NODE_PORT - [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && export RABBITMQ_CLUSTER_CONFIG_FILE [ ! -z $RABBITMQ_CONFIG_FILE ] && export RABBITMQ_CONFIG_FILE [ ! -z $RABBITMQ_LOG_BASE ] && export RABBITMQ_LOG_BASE [ ! -z $RABBITMQ_MNESIA_BASE ] && export RABBITMQ_MNESIA_BASE @@ -215,11 +204,6 @@ rabbit_validate_partial() { } rabbit_validate_full() { - if [ ! -z $RABBITMQ_CLUSTER_CONFIG_FILE ] && [ ! -e $RABBITMQ_CLUSTER_CONFIG_FILE ]; then - ocf_log err "rabbitmq-server cluster_config_file $RABBITMQ_CLUSTER_CONFIG_FILE does not exist or is not a file"; - exit $OCF_ERR_INSTALLED; - fi - if [ ! -z $RABBITMQ_CONFIG_FILE ] && [ ! -e $RABBITMQ_CONFIG_FILE ]; then ocf_log err "rabbitmq-server config_file $RABBITMQ_CONFIG_FILE does not exist or is not a file"; exit $OCF_ERR_INSTALLED; diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 2261b56e..7283f0e8 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -79,12 +79,6 @@ fi [ -f "${RABBITMQ_LOGS}" ] && cat "${RABBITMQ_LOGS}" >> "${RABBITMQ_LOGS}${RABBITMQ_BACKUP_EXTENSION}" [ -f "${RABBITMQ_SASL_LOGS}" ] && cat "${RABBITMQ_SASL_LOGS}" >> "${RABBITMQ_SASL_LOGS}${RABBITMQ_BACKUP_EXTENSION}" -if [ -f "$RABBITMQ_CLUSTER_CONFIG_FILE" ]; then - RABBITMQ_CLUSTER_CONFIG_OPTION="-rabbit cluster_config \"$RABBITMQ_CLUSTER_CONFIG_FILE\"" -else - RABBITMQ_CLUSTER_CONFIG_OPTION="" -fi - RABBITMQ_START_RABBIT= [ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput' @@ -124,6 +118,5 @@ exec erl \ -os_mon start_disksup false \ -os_mon start_memsup false \ -mnesia dir "\"${RABBITMQ_MNESIA_DIR}\"" \ - ${RABBITMQ_CLUSTER_CONFIG_OPTION} \ ${RABBITMQ_SERVER_START_ARGS} \ "$@" diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index a290f935..cebd7d1d 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -103,14 +103,6 @@ if exist "!SASL_LOGS!" ( rem End of log management
-if "!RABBITMQ_CLUSTER_CONFIG_FILE!"=="" (
- set RABBITMQ_CLUSTER_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq_cluster.config
-)
-set CLUSTER_CONFIG=
-if not exist "!RABBITMQ_CLUSTER_CONFIG_FILE!" GOTO L1
-set CLUSTER_CONFIG=-rabbit cluster_config \""!RABBITMQ_CLUSTER_CONFIG_FILE:\=/!"\"
-:L1
-
if "!RABBITMQ_MNESIA_DIR!"=="" (
set RABBITMQ_MNESIA_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-mnesia
)
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index bd117b83..96248f6a 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -136,14 +136,6 @@ if exist "!SASL_LOGS!" ( rem End of log management
-if "!RABBITMQ_CLUSTER_CONFIG_FILE!"=="" (
- set RABBITMQ_CLUSTER_CONFIG_FILE=!RABBITMQ_BASE!\rabbitmq_cluster.config
-)
-set CLUSTER_CONFIG=
-if not exist "!RABBITMQ_CLUSTER_CONFIG_FILE!" GOTO L1
-set CLUSTER_CONFIG=-rabbit cluster_config \""!RABBITMQ_CLUSTER_CONFIG_FILE:\=/!"\"
-:L1
-
if "!RABBITMQ_MNESIA_DIR!"=="" (
set RABBITMQ_MNESIA_DIR=!RABBITMQ_MNESIA_BASE!/!RABBITMQ_NODENAME!-mnesia
)
diff --git a/src/rabbit.erl b/src/rabbit.erl index 18045b94..ada2c38e 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -426,9 +426,9 @@ print_banner() -> "| ~s +---+ |~n" "| |~n" "+-------------------+~n" - "AMQP ~p-~p~n~s~n~s~n~n", + "~s~n~s~n~s~n~n", [Product, string:right([$v|Version], ProductLen), - ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR, + ?PROTOCOL_VERSION, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), Settings = [{"node", node()}, {"app descriptor", app_location()}, diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 03a19961..c76c01ac 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -97,18 +97,24 @@ delivery(Mandatory, Immediate, Txn, Message) -> sender = self(), message = Message}. build_content(Properties, BodyBin) -> - {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), + %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 + {ClassId, _MethodId} = + rabbit_framing_amqp_0_9_1:method_id('basic.publish'), #content{class_id = ClassId, properties = Properties, properties_bin = none, + protocol = none, payload_fragments_rev = [BodyBin]}. from_content(Content) -> #content{class_id = ClassId, properties = Props, payload_fragments_rev = FragmentsRev} = - rabbit_binary_parser:ensure_content_decoded(Content), - {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), + %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 + rabbit_binary_parser:ensure_content_decoded(Content, + rabbit_framing_amqp_0_9_1), + {ClassId, _MethodId} = + rabbit_framing_amqp_0_9_1:method_id('basic.publish'), {Props, list_to_binary(lists:reverse(FragmentsRev))}. message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 0e6ebe57..f0ec6180 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -41,12 +41,12 @@ % See definition of check_empty_content_body_frame_size/0, an assertion called at startup. -define(EMPTY_CONTENT_BODY_FRAME_SIZE, 8). --export([build_simple_method_frame/2, - build_simple_content_frames/3, +-export([build_simple_method_frame/3, + build_simple_content_frames/4, build_heartbeat_frame/0]). -export([generate_table/1, encode_properties/2]). -export([check_empty_content_body_frame_size/0]). --export([ensure_content_encoded/1, clear_encoded_content/1]). +-export([ensure_content_encoded/2, clear_encoded_content/1]). -import(lists). @@ -56,20 +56,22 @@ -type(frame() :: [binary()]). --spec(build_simple_method_frame/2 :: - (rabbit_channel:channel_number(), rabbit_framing:amqp_method_record()) +-spec(build_simple_method_frame/3 :: + (rabbit_channel:channel_number(), rabbit_framing:amqp_method_record(), + rabbit_types:protocol()) -> frame()). --spec(build_simple_content_frames/3 :: +-spec(build_simple_content_frames/4 :: (rabbit_channel:channel_number(), rabbit_types:content(), - non_neg_integer()) + non_neg_integer(), rabbit_types:protocol()) -> [frame()]). -spec(build_heartbeat_frame/0 :: () -> frame()). -spec(generate_table/1 :: (rabbit_framing:amqp_table()) -> binary()). -spec(encode_properties/2 :: ([rabbit_framing:amqp_property_type()], [any()]) -> binary()). -spec(check_empty_content_body_frame_size/0 :: () -> 'ok'). --spec(ensure_content_encoded/1 :: - (rabbit_types:content()) -> rabbit_types:encoded_content()). +-spec(ensure_content_encoded/2 :: + (rabbit_types:content(), rabbit_types:protocol()) -> + rabbit_types:encoded_content()). -spec(clear_encoded_content/1 :: (rabbit_types:content()) -> rabbit_types:unencoded_content()). @@ -77,30 +79,24 @@ %%---------------------------------------------------------------------------- -build_simple_method_frame(ChannelInt, MethodRecord) -> - MethodFields = rabbit_framing:encode_method_fields(MethodRecord), +build_simple_method_frame(ChannelInt, MethodRecord, Protocol) -> + MethodFields = Protocol:encode_method_fields(MethodRecord), MethodName = rabbit_misc:method_record_type(MethodRecord), - {ClassId, MethodId} = rabbit_framing:method_id(MethodName), + {ClassId, MethodId} = Protocol:method_id(MethodName), create_frame(1, ChannelInt, [<<ClassId:16, MethodId:16>>, MethodFields]). -build_simple_content_frames(ChannelInt, - #content{class_id = ClassId, - properties = ContentProperties, - properties_bin = ContentPropertiesBin, - payload_fragments_rev = PayloadFragmentsRev}, - FrameMax) -> - {BodySize, ContentFrames} = build_content_frames(PayloadFragmentsRev, FrameMax, ChannelInt), +build_simple_content_frames(ChannelInt, Content, FrameMax, Protocol) -> + #content{class_id = ClassId, + properties_bin = ContentPropertiesBin, + payload_fragments_rev = PayloadFragmentsRev} = + ensure_content_encoded(Content, Protocol), + {BodySize, ContentFrames} = + build_content_frames(PayloadFragmentsRev, FrameMax, ChannelInt), HeaderFrame = create_frame(2, ChannelInt, [<<ClassId:16, 0:16, BodySize:64>>, - maybe_encode_properties(ContentProperties, ContentPropertiesBin)]), + ContentPropertiesBin]), [HeaderFrame | ContentFrames]. -maybe_encode_properties(_ContentProperties, ContentPropertiesBin) - when is_binary(ContentPropertiesBin) -> - ContentPropertiesBin; -maybe_encode_properties(ContentProperties, none) -> - rabbit_framing:encode_properties(ContentProperties). - build_content_frames(FragsRev, FrameMax, ChannelInt) -> BodyPayloadMax = if FrameMax == 0 -> iolist_size(FragsRev); @@ -283,13 +279,16 @@ check_empty_content_body_frame_size() -> ComputedSize, ?EMPTY_CONTENT_BODY_FRAME_SIZE}) end. -ensure_content_encoded(Content = #content{properties_bin = PropsBin}) +ensure_content_encoded(Content = #content{properties_bin = PropsBin, + protocol = Protocol}, Protocol) when PropsBin =/= 'none' -> Content; -ensure_content_encoded(Content = #content{properties = Props}) -> - Content #content{properties_bin = rabbit_framing:encode_properties(Props)}. +ensure_content_encoded(Content = #content{properties = Props}, Protocol) -> + Content#content{properties_bin = Protocol:encode_properties(Props), + protocol = Protocol}. -clear_encoded_content(Content = #content{properties_bin = none}) -> +clear_encoded_content(Content = #content{properties_bin = none, + protocol = none}) -> Content; clear_encoded_content(Content = #content{properties = none}) -> %% Only clear when we can rebuild the properties_bin later in @@ -297,4 +296,4 @@ clear_encoded_content(Content = #content{properties = none}) -> %% one of properties and properties_bin can be 'none' Content; clear_encoded_content(Content = #content{}) -> - Content#content{properties_bin = none}. + Content#content{properties_bin = none, protocol = none}. diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index 69e34440..1d0a62af 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -34,7 +34,7 @@ -include("rabbit.hrl"). -export([parse_table/1, parse_properties/2]). --export([ensure_content_decoded/1, clear_decoded_content/1]). +-export([ensure_content_decoded/2, clear_decoded_content/1]). -import(lists). @@ -45,8 +45,9 @@ -spec(parse_table/1 :: (binary()) -> rabbit_framing:amqp_table()). -spec(parse_properties/2 :: ([rabbit_framing:amqp_property_type()], binary()) -> [any()]). --spec(ensure_content_decoded/1 :: - (rabbit_types:content()) -> rabbit_types:decoded_content()). +-spec(ensure_content_decoded/2 :: + (rabbit_types:content(), rabbit_types:protocol()) + -> rabbit_types:decoded_content()). -spec(clear_decoded_content/1 :: (rabbit_types:content()) -> rabbit_types:undecoded_content()). @@ -162,12 +163,12 @@ parse_property(bit, Rest) -> parse_property(table, <<Len:32/unsigned, Table:Len/binary, Rest/binary>>) -> {parse_table(Table), Rest}. -ensure_content_decoded(Content = #content{properties = Props}) +ensure_content_decoded(Content = #content{properties = Props}, _Protocol) when Props =/= 'none' -> Content; -ensure_content_decoded(Content = #content{properties_bin = PropBin}) +ensure_content_decoded(Content = #content{properties_bin = PropBin}, Protocol) when is_binary(PropBin) -> - Content#content{properties = rabbit_framing:decode_properties( + Content#content{properties = Protocol:decode_properties( Content#content.class_id, PropBin)}. clear_decoded_content(Content = #content{properties = none}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e37aff81..c4ff361d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -78,7 +78,7 @@ -spec(start_link/6 :: (channel_number(), pid(), pid(), rabbit_access_control:username(), - rabbit_types:vhost(), pid()) -> pid()). + rabbit_types:vhost(), pid()) -> rabbit_types:ok(pid())). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). @@ -102,10 +102,8 @@ %%---------------------------------------------------------------------------- start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) -> - {ok, Pid} = gen_server2:start_link( - ?MODULE, [Channel, ReaderPid, WriterPid, - Username, VHost, CollectorPid], []), - Pid. + gen_server2:start_link(?MODULE, [Channel, ReaderPid, WriterPid, + Username, VHost, CollectorPid], []). do(Pid, Method) -> do(Pid, Method, none). @@ -421,7 +419,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, Exchange = rabbit_exchange:lookup_or_die(ExchangeName), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. - DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), + DecodedContent = rabbit_binary_parser:ensure_content_decoded( + Content, rabbit_framing_amqp_0_9_1), IsPersistent = is_message_persistent(DecodedContent), Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, @@ -949,7 +948,7 @@ basic_return(#basic_message{exchange_name = ExchangeName, content = Content}, WriterPid, Reason) -> {_Close, ReplyCode, ReplyText} = - rabbit_framing:lookup_amqp_exception(Reason), + rabbit_framing_amqp_0_9_1:lookup_amqp_exception(Reason), ok = rabbit_writer:send_command( WriterPid, #'basic.return'{reply_code = ReplyCode, @@ -1042,7 +1041,7 @@ fold_per_queue(F, Acc0, UAQ) -> Acc0, D). start_limiter(State = #ch{unacked_message_q = UAMQ}) -> - LPid = rabbit_limiter:start_link(self(), queue:len(UAMQ)), + {ok, LPid} = rabbit_limiter:start_link(self(), queue:len(UAMQ)), ok = limit_queues(LPid, State), LPid. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 7f7622b2..49f87a22 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -378,7 +378,6 @@ cleanup_deleted_queue_bindings1(ExchangeName, Bindings) -> [X] = mnesia:read({rabbit_exchange, ExchangeName}), {maybe_auto_delete(X), Bindings}. - delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), ok = mnesia:delete_object(rabbit_durable_route, Route, write). diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index bc1a2a08..f96e69eb 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -32,21 +32,22 @@ -module(rabbit_framing_channel). -include("rabbit.hrl"). --export([start_link/2, process/2, shutdown/1]). +-export([start_link/3, process/2, shutdown/1]). %% internal --export([mainloop/1]). +-export([mainloop/2]). %%-------------------------------------------------------------------- -start_link(StartFun, StartArgs) -> - spawn_link( - fun () -> - %% we trap exits so that a normal termination of the - %% channel or reader process terminates us too. - process_flag(trap_exit, true), - mainloop(apply(StartFun, StartArgs)) - end). +start_link(StartFun, StartArgs, Protocol) -> + {ok, spawn_link( + fun () -> + %% we trap exits so that a normal termination of + %% the channel or reader process terminates us too. + process_flag(trap_exit, true), + {ok, ChannelPid} = apply(StartFun, StartArgs), + mainloop(ChannelPid, Protocol) + end)}. process(Pid, Frame) -> Pid ! {frame, Frame}, @@ -72,37 +73,43 @@ read_frame(ChannelPid) -> Msg -> exit({unexpected_message, Msg}) end. -mainloop(ChannelPid) -> - {method, MethodName, FieldsBin} = read_frame(ChannelPid), - Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin), - case rabbit_framing:method_has_content(MethodName) of - true -> {ClassId, _MethodId} = rabbit_framing:method_id(MethodName), - rabbit_channel:do(ChannelPid, Method, - collect_content(ChannelPid, ClassId)); - false -> rabbit_channel:do(ChannelPid, Method) - end, - ?MODULE:mainloop(ChannelPid). +mainloop(ChannelPid, Protocol) -> + Decoded = read_frame(ChannelPid), + case Decoded of + {method, MethodName, FieldsBin} -> + Method = Protocol:decode_method_fields(MethodName, FieldsBin), + case Protocol:method_has_content(MethodName) of + true -> {ClassId, _MethodId} = Protocol:method_id(MethodName), + rabbit_channel:do(ChannelPid, Method, + collect_content(ChannelPid, + ClassId, + Protocol)); + false -> rabbit_channel:do(ChannelPid, Method) + end, + ?MODULE:mainloop(ChannelPid, Protocol); + _ -> + unexpected_frame("expected method frame, " + "got non method frame instead", + []) + end. -collect_content(ChannelPid, ClassId) -> +collect_content(ChannelPid, ClassId, Protocol) -> case read_frame(ChannelPid) of {content_header, ClassId, 0, BodySize, PropertiesBin} -> Payload = collect_content_payload(ChannelPid, BodySize, []), #content{class_id = ClassId, properties = none, properties_bin = PropertiesBin, + protocol = Protocol, payload_fragments_rev = Payload}; {content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} -> - rabbit_misc:protocol_error( - command_invalid, - "expected content header for class ~w, " - "got one for class ~w instead", - [ClassId, HeaderClassId]); + unexpected_frame("expected content header for class ~w, " + "got one for class ~w instead", + [ClassId, HeaderClassId]); _ -> - rabbit_misc:protocol_error( - command_invalid, - "expected content header for class ~w, " - "got non content header frame instead", - [ClassId]) + unexpected_frame("expected content header for class ~w, " + "got non content header frame instead", + [ClassId]) end. collect_content_payload(_ChannelPid, 0, Acc) -> @@ -114,8 +121,10 @@ collect_content_payload(ChannelPid, RemainingByteCount, Acc) -> RemainingByteCount - size(FragmentBin), [FragmentBin | Acc]); _ -> - rabbit_misc:protocol_error( - command_invalid, - "expected content body, got non content body frame instead", - []) + unexpected_frame("expected content body, " + "got non content body frame instead", + []) end. + +unexpected_frame(ExplanationFormat, Params) -> + rabbit_misc:protocol_error(unexpected_frame, ExplanationFormat, Params). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 878af029..813ccc75 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -45,7 +45,7 @@ -type(maybe_pid() :: pid() | 'undefined'). --spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()). +-spec(start_link/2 :: (pid(), non_neg_integer()) -> rabbit_types:ok(pid())). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). -spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). @@ -74,8 +74,7 @@ %%---------------------------------------------------------------------------- start_link(ChPid, UnackedMsgCount) -> - {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []), - Pid. + gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []). shutdown(undefined) -> ok; diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 689f799d..c808499b 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -264,20 +264,9 @@ read_cluster_nodes_config() -> case rabbit_misc:read_term_file(FileName) of {ok, [ClusterNodes]} -> ClusterNodes; {error, enoent} -> - case application:get_env(cluster_config) of + case application:get_env(cluster_nodes) of undefined -> []; - {ok, DefaultFileName} -> - case file:consult(DefaultFileName) of - {ok, [ClusterNodes]} -> ClusterNodes; - {error, enoent} -> - error_logger:warning_msg( - "default cluster config file ~p does not exist~n", - [DefaultFileName]), - []; - {error, Reason} -> - throw({error, {cannot_read_cluster_nodes_config, - DefaultFileName, Reason}}) - end + {ok, ClusterNodes} -> ClusterNodes end; {error, Reason} -> throw({error, {cannot_read_cluster_nodes_config, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index b5514c82..9603faf5 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -41,7 +41,7 @@ -export([server_properties/0]). --export([analyze_frame/2]). +-export([analyze_frame/3]). -import(gen_tcp). -import(fprof). @@ -62,8 +62,8 @@ -define(INFO_KEYS, [pid, address, port, peer_address, peer_port, - recv_oct, recv_cnt, send_oct, send_cnt, send_pend, - state, channels, user, vhost, timeout, frame_max, client_properties]). + recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels, + protocol, user, vhost, timeout, frame_max, client_properties]). %% connection lifecycle %% @@ -249,7 +249,8 @@ start_connection(Parent, Deb, Sock, SockTransform) -> timeout_sec = ?HANDSHAKE_TIMEOUT, frame_max = ?FRAME_MIN_SIZE, vhost = none, - client_properties = none}, + client_properties = none, + protocol = none}, callback = uninitialized_callback, recv_ref = none, connection_state = pre_init, @@ -437,7 +438,9 @@ wait_for_channel_termination(N, TimerRef) -> end. maybe_close(State = #v1{connection_state = closing, - queue_collector = Collector}) -> + queue_collector = Collector, + connection = #connection{protocol = Protocol}, + sock = Sock}) -> case all_channels() of [] -> %% Spec says "Exclusive queues may only be accessed by the current @@ -445,16 +448,18 @@ maybe_close(State = #v1{connection_state = closing, %% This does not strictly imply synchrony, but in practice it seems %% to be what people assume. rabbit_queue_collector:delete_all(Collector), - ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}), + ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), close_connection(State); _ -> State end; maybe_close(State) -> State. -handle_frame(Type, 0, Payload, State = #v1{connection_state = CS}) +handle_frame(Type, 0, Payload, + State = #v1{connection_state = CS, + connection = #connection{protocol = Protocol}}) when CS =:= closing; CS =:= closed -> - case analyze_frame(Type, Payload) of + case analyze_frame(Type, Payload, Protocol) of {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); _Other -> State @@ -462,20 +467,20 @@ handle_frame(Type, 0, Payload, State = #v1{connection_state = CS}) handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS}) when CS =:= closing; CS =:= closed -> State; -handle_frame(Type, 0, Payload, State) -> - case analyze_frame(Type, Payload) of +handle_frame(Type, 0, Payload, + State = #v1{connection = #connection{protocol = Protocol}}) -> + case analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, 0, Type, Payload}); heartbeat -> State; - trace -> State; {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); Other -> throw({unexpected_frame_on_channel0, Other}) end; -handle_frame(Type, Channel, Payload, State) -> - case analyze_frame(Type, Payload) of +handle_frame(Type, Channel, Payload, + State = #v1{connection = #connection{protocol = Protocol}}) -> + case analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, Channel, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); - trace -> throw({unexpected_trace_frame, Channel}); AnalyzedFrame -> %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of @@ -516,17 +521,20 @@ handle_frame(Type, Channel, Payload, State) -> end end. -analyze_frame(?FRAME_METHOD, <<ClassId:16, MethodId:16, MethodFields/binary>>) -> - {method, rabbit_framing:lookup_method_name({ClassId, MethodId}), MethodFields}; -analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>) -> +analyze_frame(?FRAME_METHOD, + <<ClassId:16, MethodId:16, MethodFields/binary>>, + Protocol) -> + MethodName = Protocol:lookup_method_name({ClassId, MethodId}), + {method, MethodName, MethodFields}; +analyze_frame(?FRAME_HEADER, + <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>, + _Protocol) -> {content_header, ClassId, Weight, BodySize, Properties}; -analyze_frame(?FRAME_BODY, Body) -> +analyze_frame(?FRAME_BODY, Body, _Protocol) -> {content_body, Body}; -analyze_frame(?FRAME_TRACE, _Body) -> - trace; -analyze_frame(?FRAME_HEARTBEAT, <<>>) -> +analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) -> heartbeat; -analyze_frame(_Type, _Body) -> +analyze_frame(_Type, _Body, _Protocol) -> error. handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> @@ -543,54 +551,61 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat throw({bad_payload, PayloadAndMarker}) end; -handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, - State = #v1{sock = Sock, connection = Connection}) -> - case check_version({ProtocolMajor, ProtocolMinor}, - {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of - true -> - ok = send_on_channel0( - Sock, - #'connection.start'{ - version_major = ?PROTOCOL_VERSION_MAJOR, - version_minor = ?PROTOCOL_VERSION_MINOR, - server_properties = server_properties(), - mechanisms = <<"PLAIN AMQPLAIN">>, - locales = <<"en_US">> }), - {State#v1{connection = Connection#connection{ - timeout_sec = ?NORMAL_TIMEOUT}, - connection_state = starting}, - frame_header, 7}; - false -> - throw({bad_version, ProtocolMajor, ProtocolMinor}) - end; +%% The two rules pertaining to version negotiation: +%% +%% * If the server cannot support the protocol specified in the +%% protocol header, it MUST respond with a valid protocol header and +%% then close the socket connection. +%% +%% * The server MUST provide a protocol version that is lower than or +%% equal to that requested by the client in the protocol header. +handle_input(handshake, <<"AMQP", 0, 0, 9, 1>>, State) -> + start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State); + +handle_input(handshake, <<"AMQP", 1, 1, 0, 9>>, State) -> + start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State); + +%% the 0-8 spec, confusingly, defines the version as 8-0 +handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) -> + start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); + +handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) -> + refuse_connection(Sock, {bad_version, A, B, C, D}); handle_input(handshake, Other, #v1{sock = Sock}) -> - ok = inet_op(fun () -> rabbit_net:send( - Sock, <<"AMQP",1,1, - ?PROTOCOL_VERSION_MAJOR, - ?PROTOCOL_VERSION_MINOR>>) end), - throw({bad_header, Other}); + refuse_connection(Sock, {bad_header, Other}); handle_input(Callback, Data, _State) -> throw({bad_input, Callback, Data}). -%% the 0-8 spec, confusingly, defines the version as 8-0 -adjust_version({8,0}) -> {0,8}; -adjust_version(Version) -> Version. -check_version(ClientVersion, ServerVersion) -> - {ClientMajor, ClientMinor} = adjust_version(ClientVersion), - {ServerMajor, ServerMinor} = adjust_version(ServerVersion), - ClientMajor > ServerMajor - orelse - (ClientMajor == ServerMajor andalso - ClientMinor >= ServerMinor). +%% Offer a protocol version to the client. Connection.start only +%% includes a major and minor version number, Luckily 0-9 and 0-9-1 +%% are similar enough that clients will be happy with either. +start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, + Protocol, + State = #v1{sock = Sock, connection = Connection}) -> + Start = #'connection.start'{ version_major = ProtocolMajor, + version_minor = ProtocolMinor, + server_properties = server_properties(), + mechanisms = <<"PLAIN AMQPLAIN">>, + locales = <<"en_US">> }, + ok = send_on_channel0(Sock, Start, Protocol), + {State#v1{connection = Connection#connection{ + timeout_sec = ?NORMAL_TIMEOUT, + protocol = Protocol}, + connection_state = starting}, + frame_header, 7}. + +refuse_connection(Sock, Exception) -> + ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end), + throw(Exception). %%-------------------------------------------------------------------------- -handle_method0(MethodName, FieldsBin, State) -> +handle_method0(MethodName, FieldsBin, + State = #v1{connection = #connection{protocol = Protocol}}) -> try - handle_method0(rabbit_framing:decode_method_fields( - MethodName, FieldsBin), + handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), State) catch exit:Reason -> CompleteReason = case Reason of @@ -612,14 +627,14 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, response = Response, client_properties = ClientProperties}, State = #v1{connection_state = starting, - connection = Connection, + connection = Connection = + #connection{protocol = Protocol}, sock = Sock}) -> User = rabbit_access_control:check_login(Mechanism, Response), - ok = send_on_channel0( - Sock, - #'connection.tune'{channel_max = 0, + Tune = #'connection.tune'{channel_max = 0, frame_max = ?FRAME_MAX, - heartbeat = 0}), + heartbeat = 0}, + ok = send_on_channel0(Sock, Tune, Protocol), State#v1{connection_state = tuning, connection = Connection#connection{ user = User, @@ -645,46 +660,30 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, frame_max = FrameMax}} end; -handle_method0(#'connection.open'{virtual_host = VHostPath, - insist = Insist}, +handle_method0(#'connection.open'{virtual_host = VHostPath}, + State = #v1{connection_state = opening, connection = Connection = #connection{ - user = User}, + user = User, + protocol = Protocol}, sock = Sock}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, - KnownHosts = format_listeners(rabbit_networking:active_listeners()), - Redirects = compute_redirects(Insist), - if Redirects == [] -> - ok = send_on_channel0( - Sock, - #'connection.open_ok'{known_hosts = KnownHosts}), - State#v1{connection_state = running, - connection = NewConnection}; - true -> - %% FIXME: 'host' is supposed to only contain one - %% address; but which one do we pick? This is - %% really a problem with the spec. - Host = format_listeners(Redirects), - rabbit_log:info("connection ~p redirecting to ~p~n", - [self(), Host]), - ok = send_on_channel0( - Sock, - #'connection.redirect'{host = Host, - known_hosts = KnownHosts}), - close_connection(State#v1{connection = NewConnection}) - end; + ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), + State#v1{connection_state = running, + connection = NewConnection}; handle_method0(#'connection.close'{}, State = #v1{connection_state = running}) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); handle_method0(#'connection.close'{}, State = #v1{connection_state = CS, + connection = #connection{protocol = Protocol}, sock = Sock}) when CS =:= closing; CS =:= closed -> %% We're already closed or closing, so we don't need to cleanup %% anything. - ok = send_on_channel0(Sock, #'connection.close_ok'{}), + ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), State; handle_method0(#'connection.close_ok'{}, State = #v1{connection_state = closed}) -> @@ -697,23 +696,8 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). -send_on_channel0(Sock, Method) -> - ok = rabbit_writer:internal_send_command(Sock, 0, Method). - -format_listeners(Listeners) -> - list_to_binary( - rabbit_misc:intersperse( - $,, - [io_lib:format("~s:~w", [Host, Port]) || - #listener{host = Host, port = Port} <- Listeners])). - -compute_redirects(true) -> []; -compute_redirects(false) -> - Node = node(), - LNode = rabbit_load:pick(), - if Node == LNode -> []; - true -> rabbit_networking:node_listeners(LNode) - end. +send_on_channel0(Sock, Method, Protocol) -> + ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). %%-------------------------------------------------------------------------- @@ -747,6 +731,10 @@ i(state, #v1{connection_state = S}) -> S; i(channels, #v1{}) -> length(all_channels()); +i(protocol, #v1{connection = #connection{protocol = none}}) -> + none; +i(protocol, #v1{connection = #connection{protocol = Protocol}}) -> + Protocol:version(); i(user, #v1{connection = #connection{user = #user{username = Username}}}) -> Username; i(user, #v1{connection = #connection{user = none}}) -> @@ -770,11 +758,13 @@ send_to_new_channel(Channel, AnalyzedFrame, #v1{sock = Sock, connection = #connection{ frame_max = FrameMax, user = #user{username = Username}, - vhost = VHost}} = State, - WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), - ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/6, - [Channel, self(), WriterPid, Username, VHost, Collector]), + vhost = VHost, + protocol = Protocol}} = State, + {ok, WriterPid} = rabbit_writer:start(Sock, Channel, FrameMax, Protocol), + {ok, ChPid} = rabbit_framing_channel:start_link( + fun rabbit_channel:start_link/6, + [Channel, self(), WriterPid, Username, VHost, Collector], + Protocol), put({channel, Channel}, {chpid, ChPid}), put({chpid, ChPid}, {channel, Channel}), ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). @@ -790,25 +780,27 @@ handle_exception(State = #v1{connection_state = CS}, Channel, Reason) -> log_channel_error(CS, Channel, Reason), send_exception(State, Channel, Reason). -send_exception(State, Channel, Reason) -> - {ShouldClose, CloseChannel, CloseMethod} = map_exception(Channel, Reason), +send_exception(State = #v1{connection = #connection{protocol = Protocol}}, + Channel, Reason) -> + {ShouldClose, CloseChannel, CloseMethod} = + map_exception(Channel, Reason, Protocol), NewState = case ShouldClose of true -> terminate_channels(), close_connection(State); false -> close_channel(Channel, State) end, ok = rabbit_writer:internal_send_command( - NewState#v1.sock, CloseChannel, CloseMethod), + NewState#v1.sock, CloseChannel, CloseMethod, Protocol), NewState. -map_exception(Channel, Reason) -> +map_exception(Channel, Reason, Protocol) -> {SuggestedClose, ReplyCode, ReplyText, FailedMethod} = - lookup_amqp_exception(Reason), + lookup_amqp_exception(Reason, Protocol), ShouldClose = SuggestedClose or (Channel == 0), {ClassId, MethodId} = case FailedMethod of {_, _} -> FailedMethod; none -> {0, 0}; - _ -> rabbit_framing:method_id(FailedMethod) + _ -> Protocol:method_id(FailedMethod) end, {CloseChannel, CloseMethod} = case ShouldClose of @@ -823,22 +815,16 @@ map_exception(Channel, Reason) -> end, {ShouldClose, CloseChannel, CloseMethod}. -%% FIXME: this clause can go when we move to AMQP spec >=8.1 -lookup_amqp_exception(#amqp_error{name = precondition_failed, - explanation = Expl, - method = Method}) -> - ExplBin = amqp_exception_explanation(<<"PRECONDITION_FAILED">>, Expl), - {false, 406, ExplBin, Method}; lookup_amqp_exception(#amqp_error{name = Name, explanation = Expl, - method = Method}) -> - {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name), + method = Method}, + Protocol) -> + {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(Name), ExplBin = amqp_exception_explanation(Text, Expl), {ShouldClose, Code, ExplBin, Method}; -lookup_amqp_exception(Other) -> +lookup_amqp_exception(Other, Protocol) -> rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), - {ShouldClose, Code, Text} = - rabbit_framing:lookup_amqp_exception(internal_error), + {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(internal_error), {ShouldClose, Code, Text, none}. amqp_exception_explanation(Text, Expl) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 0b92682a..56aca1d6 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -60,6 +60,7 @@ all_tests() -> passed = test_bpqueue(), passed = test_pg_local(), passed = test_unfold(), + passed = test_supervisor_delayed_restart(), passed = test_parsing(), passed = test_content_framing(), passed = test_topic_matching(), @@ -503,8 +504,10 @@ test_content_framing(FrameMax, BodyBin) -> rabbit_binary_generator:build_simple_content_frames( 1, rabbit_binary_generator:ensure_content_encoded( - rabbit_basic:build_content(#'P_basic'{}, BodyBin)), - FrameMax), + rabbit_basic:build_content(#'P_basic'{}, BodyBin), + rabbit_framing_amqp_0_9_1), + FrameMax, + rabbit_framing_amqp_0_9_1), %% header is formatted correctly and the size is the total of the %% fragments <<_FrameHeader:7/binary, _ClassAndWeight:4/binary, @@ -937,8 +940,8 @@ test_user_management() -> test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), - Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, - self()), + {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, + <<"user">>, <<"/">>, self()), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( @@ -1067,8 +1070,8 @@ test_memory_pressure_sync(Ch, Writer) -> test_memory_pressure_spawn() -> Me = self(), Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end), - Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, - self()), + {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, + <<"user">>, <<"/">>, self()), ok = rabbit_channel:do(Ch, #'channel.open'{}), MRef = erlang:monitor(process, Ch), receive #'channel.open_ok'{} -> ok @@ -1141,8 +1144,8 @@ test_memory_pressure() -> alarm_handler:set_alarm({vm_memory_high_watermark, []}), Me = self(), Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end), - Ch4 = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>, - self()), + {ok, Ch4} = rabbit_channel:start_link(1, self(), Writer4, + <<"user">>, <<"/">>, self()), ok = rabbit_channel:do(Ch4, #'channel.open'{}), MRef4 = erlang:monitor(process, Ch4), Writer4 ! sync, @@ -1340,6 +1343,9 @@ bad_handle_hook(_, _, _) -> extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) -> handle_hook(Hookname, Handler, {Args, Extra1, Extra2}). +test_supervisor_delayed_restart() -> + test_sup:test_supervisor_delayed_restart(). + test_backing_queue() -> case application:get_env(rabbit, backing_queue_module) of {ok, rabbit_variable_queue} -> diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 2e492b80..3aaf1917 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -39,8 +39,8 @@ delivery/0, content/0, decoded_content/0, undecoded_content/0, unencoded_content/0, encoded_content/0, vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, ssl_socket/0, listener/0, - binding/0, amqqueue/0, exchange/0, connection/0, user/0, - error/1, ok_or_error/1, ok_or_error2/2, ok/1]). + binding/0, amqqueue/0, exchange/0, connection/0, protocol/0, + user/0, error/1, ok_or_error/1, ok_or_error2/2, ok/1]). -type(maybe(T) :: T | 'none'). -type(vhost() :: binary()). @@ -133,6 +133,8 @@ -type(connection() :: pid()). +-type(protocol() :: atom()). + -type(user() :: #user{username :: rabbit_access_control:username(), password :: rabbit_access_control:password()}). diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 80602038..f90ee734 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -33,14 +33,14 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/3, start_link/3, shutdown/1, mainloop/1]). +-export([start/4, start_link/4, shutdown/1, mainloop/1]). -export([send_command/2, send_command/3, send_command_and_signal_back/3, send_command_and_signal_back/4, send_command_and_notify/5]). --export([internal_send_command/3, internal_send_command/5]). +-export([internal_send_command/4, internal_send_command/6]). -import(gen_tcp). --record(wstate, {sock, channel, frame_max}). +-record(wstate, {sock, channel, frame_max, protocol}). -define(HIBERNATE_AFTER, 5000). @@ -48,14 +48,14 @@ -ifdef(use_specs). --spec(start/3 :: +-spec(start/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer()) - -> pid()). --spec(start_link/3 :: + non_neg_integer(), rabbit_types:protocol()) + -> rabbit_types:ok(pid())). +-spec(start_link/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer()) - -> pid()). + non_neg_integer(), rabbit_types:protocol()) + -> rabbit_types:ok(pid())). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(send_command/3 :: @@ -70,29 +70,31 @@ (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). --spec(internal_send_command/3 :: +-spec(internal_send_command/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - rabbit_framing:amqp_method_record()) + rabbit_framing:amqp_method_record(), rabbit_types:protocol()) -> 'ok'). --spec(internal_send_command/5 :: +-spec(internal_send_command/6 :: (rabbit_net:socket(), rabbit_channel:channel_number(), rabbit_framing:amqp_method_record(), rabbit_types:content(), - non_neg_integer()) + non_neg_integer(), rabbit_types:protocol()) -> 'ok'). -endif. %%---------------------------------------------------------------------------- -start(Sock, Channel, FrameMax) -> - spawn(?MODULE, mainloop, [#wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax}]). - -start_link(Sock, Channel, FrameMax) -> - spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, +start(Sock, Channel, FrameMax, Protocol) -> + {ok, spawn(?MODULE, mainloop, [#wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}]). + frame_max = FrameMax, + protocol = Protocol}])}. + +start_link(Sock, Channel, FrameMax, Protocol) -> + {ok, spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax, + protocol = Protocol}])}. mainloop(State) -> receive @@ -102,35 +104,40 @@ mainloop(State) -> end. handle_message({send_command, MethodRecord}, - State = #wstate{sock = Sock, channel = Channel}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord), + State = #wstate{sock = Sock, channel = Channel, + protocol = Protocol}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol), State; handle_message({send_command, MethodRecord, Content}, State = #wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}) -> + frame_max = FrameMax, + protocol = Protocol}) -> ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax), + Content, FrameMax, Protocol), State; handle_message({send_command_and_signal_back, MethodRecord, Parent}, - State = #wstate{sock = Sock, channel = Channel}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord), + State = #wstate{sock = Sock, channel = Channel, + protocol = Protocol}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol), Parent ! rabbit_writer_send_command_signal, State; handle_message({send_command_and_signal_back, MethodRecord, Content, Parent}, State = #wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}) -> + frame_max = FrameMax, + protocol = Protocol}) -> ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax), + Content, FrameMax, Protocol), Parent ! rabbit_writer_send_command_signal, State; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, State = #wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}) -> + frame_max = FrameMax, + protocol = Protocol}) -> ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax), + Content, FrameMax, Protocol), rabbit_amqqueue:notify_sent(QPid, ChPid), State; handle_message({inet_reply, _, ok}, State) -> @@ -171,30 +178,32 @@ shutdown(W) -> %--------------------------------------------------------------------------- -assemble_frames(Channel, MethodRecord) -> +assemble_frames(Channel, MethodRecord, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, none), - rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord). + rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord, + Protocol). -assemble_frames(Channel, MethodRecord, Content, FrameMax) -> +assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, Content), MethodName = rabbit_misc:method_record_type(MethodRecord), - true = rabbit_framing:method_has_content(MethodName), % assertion + true = Protocol:method_has_content(MethodName), % assertion MethodFrame = rabbit_binary_generator:build_simple_method_frame( - Channel, MethodRecord), + Channel, MethodRecord, Protocol), ContentFrames = rabbit_binary_generator:build_simple_content_frames( - Channel, Content, FrameMax), + Channel, Content, FrameMax, Protocol), [MethodFrame | ContentFrames]. tcp_send(Sock, Data) -> rabbit_misc:throw_on_error(inet_error, fun () -> rabbit_net:send(Sock, Data) end). -internal_send_command(Sock, Channel, MethodRecord) -> - ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)). +internal_send_command(Sock, Channel, MethodRecord, Protocol) -> + ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, Protocol)). -internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) -> +internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, + Protocol) -> ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax)). + Content, FrameMax, Protocol)). %% gen_tcp:send/2 does a selective receive of {inet_reply, Sock, %% Status} to obtain the result. That is bad when it is called from @@ -214,13 +223,14 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) -> %% Also note that the port has bounded buffers and port_command blocks %% when these are full. So the fact that we process the result %% asynchronously does not impact flow control. -internal_send_command_async(Sock, Channel, MethodRecord) -> - true = port_cmd(Sock, assemble_frames(Channel, MethodRecord)), +internal_send_command_async(Sock, Channel, MethodRecord, Protocol) -> + true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)), ok. -internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) -> +internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax, + Protocol) -> true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax)), + Content, FrameMax, Protocol)), ok. port_cmd(Sock, Data) -> diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 03dc0f99..fb4c9b02 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -4,11 +4,34 @@ %% 1) the module name is supervisor2 %% %% 2) there is a new strategy called -%% simple_one_for_one_terminate. This is exactly the same as for -%% simple_one_for_one, except that children *are* explicitly -%% terminated as per the shutdown component of the child_spec. +%% simple_one_for_one_terminate. This is exactly the same as for +%% simple_one_for_one, except that children *are* explicitly +%% terminated as per the shutdown component of the child_spec. %% -%% All modifications are (C) 2010 LShift Ltd. +%% 3) child specifications can contain, as the restart type, a tuple +%% {permanent, Delay} | {transient, Delay} where Delay >= 0. The +%% delay, in seconds, indicates what should happen if a child, upon +%% being restarted, exceeds the MaxT and MaxR parameters. Thus, if +%% a child exits, it is restarted as normal. If it exits +%% sufficiently quickly and often to exceed the boundaries set by +%% the MaxT and MaxR parameters, and a Delay is specified, then +%% rather than stopping the supervisor, the supervisor instead +%% continues and tries to start up the child again, Delay seconds +%% later. +%% +%% Note that you can never restart more frequently than the MaxT +%% and MaxR parameters allow: i.e. you must wait until *both* the +%% Delay has passed *and* the MaxT and MaxR parameters allow the +%% child to be restarted. +%% +%% Also note that the Delay is a *minimum*. There is no guarantee +%% that the child will be restarted within that time, especially if +%% other processes are dying and being restarted at the same time - +%% essentially we have to wait for the delay to have passed and for +%% the MaxT and MaxR parameters to permit the child to be +%% restarted. This may require waiting for longer than Delay. +%% +%% All modifications are (C) 2010 Rabbit Technologies Ltd. %% %% %CopyrightBegin% %% @@ -43,6 +66,7 @@ %% Internal exports -export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3]). -export([handle_cast/2]). +-export([delayed_restart/2]). -define(DICT, dict). @@ -119,6 +143,9 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) -> end; check_childspecs(X) -> {error, {badarg, X}}. +delayed_restart(Supervisor, RestartDetails) -> + gen_server:cast(Supervisor, {delayed_restart, RestartDetails}). + %%% --------------------------------------------------- %%% %%% Initialize the supervisor. @@ -315,6 +342,20 @@ handle_call(which_children, _From, State) -> {reply, Resp, State}. +handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) + when ?is_simple(State) -> + {ok, NState} = do_restart(RestartType, Reason, Child, State), + {noreply, NState}; +handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) + when not (?is_simple(State)) -> + case get_child(Child#child.name, State) of + {value, Child} -> + {ok, NState} = do_restart(RestartType, Reason, Child, State), + {noreply, NState}; + _ -> + {noreply, State} + end; + %%% Hopefully cause a function-clause as there is no API function %%% that utilizes cast. handle_cast(null, State) -> @@ -480,6 +521,16 @@ restart_child(Pid, Reason, State) -> {ok, State} end. +do_restart({RestartType, Delay}, Reason, Child, State) -> + case restart1(Child, State) of + {ok, NState} -> + {ok, NState}; + {terminate, NState} -> + {ok, _TRef} = timer:apply_after( + trunc(Delay*1000), ?MODULE, delayed_restart, + [self(), {{RestartType, Delay}, Reason, Child}]), + {ok, NState} + end; do_restart(permanent, Reason, Child, State) -> report_error(child_terminated, Reason, Child, State#state.name), restart(Child, State); @@ -500,14 +551,27 @@ do_restart(temporary, Reason, Child, State) -> restart(Child, State) -> case add_restart(State) of {ok, NState} -> - restart(NState#state.strategy, Child, NState); + restart(NState#state.strategy, Child, NState, fun restart/2); {terminate, NState} -> report_error(shutdown, reached_max_restart_intensity, Child, State#state.name), {shutdown, remove_child(Child, NState)} end. -restart(Strategy, Child, State) +restart1(Child, State) -> + case add_restart(State) of + {ok, NState} -> + restart(NState#state.strategy, Child, NState, fun restart1/2); + {terminate, _NState} -> + %% we've reached the max restart intensity, but the + %% add_restart will have added to the restarts + %% field. Given we don't want to die here, we need to go + %% back to the old restarts field otherwise we'll never + %% attempt to restart later. + {terminate, State} + end. + +restart(Strategy, Child, State, Restart) when Strategy =:= simple_one_for_one orelse Strategy =:= simple_one_for_one_terminate -> #child{mfa = {M, F, A}} = Child, @@ -521,9 +585,9 @@ restart(Strategy, Child, State) {ok, NState}; {error, Error} -> report_error(start_error, Error, Child, State#state.name), - restart(Child, State) + Restart(Child, State) end; -restart(one_for_one, Child, State) -> +restart(one_for_one, Child, State, Restart) -> case do_start_child(State#state.name, Child) of {ok, Pid} -> NState = replace_child(Child#child{pid = Pid}, State), @@ -533,25 +597,25 @@ restart(one_for_one, Child, State) -> {ok, NState}; {error, Reason} -> report_error(start_error, Reason, Child, State#state.name), - restart(Child, State) + Restart(Child, State) end; -restart(rest_for_one, Child, State) -> +restart(rest_for_one, Child, State, Restart) -> {ChAfter, ChBefore} = split_child(Child#child.pid, State#state.children), ChAfter2 = terminate_children(ChAfter, State#state.name), case start_children(ChAfter2, State#state.name) of {ok, ChAfter3} -> {ok, State#state{children = ChAfter3 ++ ChBefore}}; {error, ChAfter3} -> - restart(Child, State#state{children = ChAfter3 ++ ChBefore}) + Restart(Child, State#state{children = ChAfter3 ++ ChBefore}) end; -restart(one_for_all, Child, State) -> +restart(one_for_all, Child, State, Restart) -> Children1 = del_child(Child#child.pid, State#state.children), Children2 = terminate_children(Children1, State#state.name), case start_children(Children2, State#state.name) of {ok, NChs} -> {ok, State#state{children = NChs}}; {error, NChs} -> - restart(Child, State#state{children = NChs}) + Restart(Child, State#state{children = NChs}) end. %%----------------------------------------------------------------- @@ -769,7 +833,9 @@ supname(N,_) -> N. %%% {Name, Func, RestartType, Shutdown, ChildType, Modules} %%% where Name is an atom %%% Func is {Mod, Fun, Args} == {atom, atom, list} -%%% RestartType is permanent | temporary | transient +%%% RestartType is permanent | temporary | transient | +%%% {permanent, Delay} | +%%% {transient, Delay} where Delay >= 0 %%% Shutdown = integer() | infinity | brutal_kill %%% ChildType = supervisor | worker %%% Modules = [atom()] | dynamic @@ -815,10 +881,17 @@ validFunc({M, F, A}) when is_atom(M), is_list(A) -> true; validFunc(Func) -> throw({invalid_mfa, Func}). -validRestartType(permanent) -> true; -validRestartType(temporary) -> true; -validRestartType(transient) -> true; -validRestartType(RestartType) -> throw({invalid_restart_type, RestartType}). +validRestartType(permanent) -> true; +validRestartType(temporary) -> true; +validRestartType(transient) -> true; +validRestartType({permanent, Delay}) -> validDelay(Delay); +validRestartType({transient, Delay}) -> validDelay(Delay); +validRestartType(RestartType) -> throw({invalid_restart_type, + RestartType}). + +validDelay(Delay) when is_number(Delay), + Delay >= 0 -> true; +validDelay(What) -> throw({invalid_delay, What}). validShutdown(Shutdown, _) when is_integer(Shutdown), Shutdown > 0 -> true; diff --git a/src/test_sup.erl b/src/test_sup.erl new file mode 100644 index 00000000..f41793bc --- /dev/null +++ b/src/test_sup.erl @@ -0,0 +1,94 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(test_sup). + +-behaviour(supervisor2). + +-export([test_supervisor_delayed_restart/0, + init/1, start_child/0]). + +test_supervisor_delayed_restart() -> + passed = with_sup(simple_one_for_one_terminate, + fun (SupPid) -> + {ok, _ChildPid} = + supervisor2:start_child(SupPid, []), + test_supervisor_delayed_restart(SupPid) + end), + passed = with_sup(one_for_one, fun test_supervisor_delayed_restart/1). + +test_supervisor_delayed_restart(SupPid) -> + ok = ping_child(SupPid), + ok = exit_child(SupPid), + timer:sleep(10), + ok = ping_child(SupPid), + ok = exit_child(SupPid), + timer:sleep(10), + timeout = ping_child(SupPid), + timer:sleep(1010), + ok = ping_child(SupPid), + passed. + +with_sup(RestartStrategy, Fun) -> + {ok, SupPid} = supervisor2:start_link(?MODULE, [RestartStrategy]), + Res = Fun(SupPid), + exit(SupPid, shutdown), + rabbit_misc:unlink_and_capture_exit(SupPid), + Res. + +init([RestartStrategy]) -> + {ok, {{RestartStrategy, 1, 1}, + [{test, {test_sup, start_child, []}, {permanent, 1}, + 16#ffffffff, worker, [test_sup]}]}}. + +start_child() -> + {ok, proc_lib:spawn_link(fun run_child/0)}. + +ping_child(SupPid) -> + Ref = make_ref(), + get_child_pid(SupPid) ! {ping, Ref, self()}, + receive {pong, Ref} -> ok + after 1000 -> timeout + end. + +exit_child(SupPid) -> + true = exit(get_child_pid(SupPid), abnormal), + ok. + +get_child_pid(SupPid) -> + [{_Id, ChildPid, worker, [test_sup]}] = + supervisor2:which_children(SupPid), + ChildPid. + +run_child() -> + receive {ping, Ref, Pid} -> Pid ! {pong, Ref}, + run_child() + end. |