diff options
-rw-r--r-- | Makefile | 14 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 57 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 49 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 40 |
4 files changed, 114 insertions, 46 deletions
@@ -76,6 +76,18 @@ SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR)) endif endif +# Versions prior to this are not supported +NEED_MAKE := 3.80 +ifneq "$(NEED_MAKE)" "$(firstword $(sort $(NEED_MAKE) $(MAKE_VERSION)))" +$(error Versions of make prior to $(NEED_MAKE) are not supported) +endif + +# .DEFAULT_GOAL introduced in 3.81 +DEFAULT_GOAL_MAKE := 3.81 +ifneq "$(DEFAULT_GOAL_MAKE)" "$(firstword $(sort $(DEFAULT_GOAL_MAKE) $(MAKE_VERSION)))" +.DEFAULT_GOAL=all +endif + all: $(TARGETS) $(DEPS_FILE): $(SOURCES) $(INCLUDES) @@ -268,7 +280,7 @@ install_dirs: mkdir -p $(SBIN_DIR) mkdir -p $(MAN_DIR) -$(foreach XML, $(USAGES_XML), $(eval $(call usage_dep, $(XML)))) +$(foreach XML,$(USAGES_XML),$(eval $(call usage_dep, $(XML)))) # Note that all targets which depend on clean must have clean in their # name. Also any target that doesn't depend on clean should not have diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d337df29..8649ecc7 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -726,42 +726,33 @@ handle_method(#'queue.declare'{queue = QueueNameBin, end, %% We use this in both branches, because queue_declare may yet return an %% existing queue. - Finish = fun (#amqqueue{name = QueueName, - durable = Durable1, - auto_delete = AutoDelete1} = Q) - when Durable =:= Durable1, AutoDelete =:= AutoDelete1 -> - check_exclusive_access(Q, Owner, strict), - check_configure_permitted(QueueName, State), - %% We need to notify the reader within the channel - %% process so that we can be sure there are no - %% outstanding exclusive queues being declared as the - %% connection shuts down. - case Owner of - none -> ok; - _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q) - end, - Q; - %% non-equivalence trumps exclusivity arbitrarily - (#amqqueue{name = QueueName}) -> - rabbit_misc:protocol_error( - precondition_failed, - "parameters for ~s not equivalent", - [rabbit_misc:rs(QueueName)]) - end, - Q = case rabbit_amqqueue:with( - rabbit_misc:r(VHostPath, queue, QueueNameBin), - Finish) of - {error, not_found} -> - ActualNameBin = - case QueueNameBin of + ActualNameBin = case QueueNameBin of <<>> -> rabbit_guid:binstring_guid("amq.gen"); Other -> check_name('queue', Other) end, - QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), - Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, - Args, Owner)); - #amqqueue{} = Other -> - Other + QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), + Q = case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, + Args, Owner) of + #amqqueue{name = QueueName, + durable = Durable1, + auto_delete = AutoDelete1} = Q1 + when Durable =:= Durable1, AutoDelete =:= AutoDelete1 -> + check_exclusive_access(Q1, Owner, strict), + check_configure_permitted(QueueName, State), + %% We need to notify the reader within the channel + %% process so that we can be sure there are no + %% outstanding exclusive queues being declared as the + %% connection shuts down. + case Owner of + none -> ok; + _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q1) + end, + Q1; + %% non-equivalence trumps exclusivity arbitrarily + #amqqueue{name = QueueName} -> + rabbit_misc:protocol_error( + precondition_failed, "parameters for ~s not equivalent", + [rabbit_misc:rs(QueueName)]) end, return_queue_declare_ok(State, NoWait, Q); diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 73a58f13..f2a903dc 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -53,6 +53,7 @@ -define(CLOSING_TIMEOUT, 1). -define(CHANNEL_TERMINATION_TIMEOUT, 3). -define(SILENT_CLOSE_DELAY, 3). +-define(FRAME_MAX, 131072). %% set to zero once QPid fix their negotiation %--------------------------------------------------------------------------- @@ -102,6 +103,8 @@ %% heartbeat timeout -> *throw* %% closing: %% socket close -> *terminate* +%% receive connection.close -> send connection.close_ok, +%% *closing* %% receive frame -> ignore, *closing* %% handshake_timeout -> ignore, *closing* %% heartbeat timeout -> *throw* @@ -118,6 +121,8 @@ %% start terminate_connection timer, *closed* %% closed: %% socket close -> *terminate* +%% receive connection.close -> send connection.close_ok, +%% *closed* %% receive connection.close_ok -> self() ! terminate_connection, %% *closed* %% receive frame -> ignore, *closed* @@ -485,10 +490,18 @@ handle_frame(Type, Channel, Payload, State) -> closing -> %% According to the spec, after sending a %% channel.close we must ignore all frames except + %% channel.close and channel.close_ok. In the + %% event of a channel.close, we should send back a %% channel.close_ok. case AnalyzedFrame of {method, 'channel.close_ok', _} -> erase({channel, Channel}); + {method, 'channel.close', _} -> + %% We're already closing this channel, so + %% there's no cleanup to do (notify + %% queues, etc.) + ok = rabbit_writer:send_command(State#v1.sock, + #'channel.close_ok'{}); _ -> ok end, State; @@ -605,27 +618,33 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, ok = send_on_channel0( Sock, #'connection.tune'{channel_max = 0, - %% set to zero once QPid fix their negotiation - frame_max = 131072, + frame_max = ?FRAME_MAX, heartbeat = 0}), State#v1{connection_state = tuning, connection = Connection#connection{ user = User, client_properties = ClientProperties}}; -handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax, - frame_max = FrameMax, +handle_method0(#'connection.tune_ok'{frame_max = FrameMax, heartbeat = ClientHeartbeat}, State = #v1{connection_state = tuning, connection = Connection, sock = Sock}) -> - %% if we have a channel_max limit that the client wishes to - %% exceed, die as per spec. Not currently a problem, so we ignore - %% the client's channel_max parameter. - rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat), - State#v1{connection_state = opening, - connection = Connection#connection{ - timeout_sec = ClientHeartbeat, - frame_max = FrameMax}}; + if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) -> + rabbit_misc:protocol_error( + not_allowed, "frame_max=~w < ~w min size", + [FrameMax, ?FRAME_MIN_SIZE]); + (?FRAME_MAX /= 0) and (FrameMax > ?FRAME_MAX) -> + rabbit_misc:protocol_error( + not_allowed, "frame_max=~w > ~w max size", + [FrameMax, ?FRAME_MAX]); + true -> + rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat), + State#v1{connection_state = opening, + connection = Connection#connection{ + timeout_sec = ClientHeartbeat, + frame_max = FrameMax}} + end; + handle_method0(#'connection.open'{virtual_host = VHostPath, insist = Insist}, State = #v1{connection_state = opening, @@ -659,6 +678,12 @@ handle_method0(#'connection.close'{}, State = #v1{connection_state = running}) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); +handle_method0(#'connection.close'{}, State = #v1{connection_state = CS}) + when CS =:= closing; CS =:= closed -> + %% We're already closed or closing, so we don't need to cleanup + %% anything. + ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}), + State; handle_method0(#'connection.close_ok'{}, State = #v1{connection_state = closed}) -> self() ! terminate_connection, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ecc2613d..cf782497 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -55,6 +55,7 @@ all_tests() -> passed = test_pg_local(), passed = test_unfold(), passed = test_parsing(), + passed = test_content_framing(), passed = test_topic_matching(), passed = test_log_management(), passed = test_app_management(), @@ -353,6 +354,45 @@ test_field_values() -> >>), passed. +%% Test that content frames don't exceed frame-max +test_content_framing(FrameMax, Fragments) -> + [Header | Frames] = + rabbit_binary_generator:build_simple_content_frames( + 1, + #content{class_id = 0, properties_bin = <<>>, + payload_fragments_rev = Fragments}, + FrameMax), + %% header is formatted correctly and the size is the total of the + %% fragments + <<_FrameHeader:7/binary, _ClassAndWeight:4/binary, + BodySize:64/unsigned, _Rest/binary>> = list_to_binary(Header), + BodySize = size(list_to_binary(Fragments)), + false = lists:any( + fun (ContentFrame) -> + FrameBinary = list_to_binary(ContentFrame), + %% assert + <<_TypeAndChannel:3/binary, + Size:32/unsigned, + _Payload:Size/binary, + 16#CE>> = FrameBinary, + size(FrameBinary) > FrameMax + end, + Frames), + passed. + +test_content_framing() -> + %% no content + passed = test_content_framing(4096, []), + passed = test_content_framing(4096, [<<>>]), + %% easily fit in one frame + passed = test_content_framing(4096, [<<"Easy">>]), + %% exactly one frame (empty frame = 8 bytes) + passed = test_content_framing(11, [<<"One">>]), + %% more than one frame + passed = test_content_framing(20, [<<"into more than one frame">>, + <<"This will have to go">>]), + passed. + test_topic_match(P, R) -> test_topic_match(P, R, true). |