summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile14
-rw-r--r--src/rabbit_channel.erl57
-rw-r--r--src/rabbit_reader.erl49
-rw-r--r--src/rabbit_tests.erl40
4 files changed, 114 insertions, 46 deletions
diff --git a/Makefile b/Makefile
index 36b045f7..5935f034 100644
--- a/Makefile
+++ b/Makefile
@@ -76,6 +76,18 @@ SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR))
endif
endif
+# Versions prior to this are not supported
+NEED_MAKE := 3.80
+ifneq "$(NEED_MAKE)" "$(firstword $(sort $(NEED_MAKE) $(MAKE_VERSION)))"
+$(error Versions of make prior to $(NEED_MAKE) are not supported)
+endif
+
+# .DEFAULT_GOAL introduced in 3.81
+DEFAULT_GOAL_MAKE := 3.81
+ifneq "$(DEFAULT_GOAL_MAKE)" "$(firstword $(sort $(DEFAULT_GOAL_MAKE) $(MAKE_VERSION)))"
+.DEFAULT_GOAL=all
+endif
+
all: $(TARGETS)
$(DEPS_FILE): $(SOURCES) $(INCLUDES)
@@ -268,7 +280,7 @@ install_dirs:
mkdir -p $(SBIN_DIR)
mkdir -p $(MAN_DIR)
-$(foreach XML, $(USAGES_XML), $(eval $(call usage_dep, $(XML))))
+$(foreach XML,$(USAGES_XML),$(eval $(call usage_dep, $(XML))))
# Note that all targets which depend on clean must have clean in their
# name. Also any target that doesn't depend on clean should not have
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d337df29..8649ecc7 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -726,42 +726,33 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
end,
%% We use this in both branches, because queue_declare may yet return an
%% existing queue.
- Finish = fun (#amqqueue{name = QueueName,
- durable = Durable1,
- auto_delete = AutoDelete1} = Q)
- when Durable =:= Durable1, AutoDelete =:= AutoDelete1 ->
- check_exclusive_access(Q, Owner, strict),
- check_configure_permitted(QueueName, State),
- %% We need to notify the reader within the channel
- %% process so that we can be sure there are no
- %% outstanding exclusive queues being declared as the
- %% connection shuts down.
- case Owner of
- none -> ok;
- _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q)
- end,
- Q;
- %% non-equivalence trumps exclusivity arbitrarily
- (#amqqueue{name = QueueName}) ->
- rabbit_misc:protocol_error(
- precondition_failed,
- "parameters for ~s not equivalent",
- [rabbit_misc:rs(QueueName)])
- end,
- Q = case rabbit_amqqueue:with(
- rabbit_misc:r(VHostPath, queue, QueueNameBin),
- Finish) of
- {error, not_found} ->
- ActualNameBin =
- case QueueNameBin of
+ ActualNameBin = case QueueNameBin of
<<>> -> rabbit_guid:binstring_guid("amq.gen");
Other -> check_name('queue', Other)
end,
- QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
- Finish(rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
- Args, Owner));
- #amqqueue{} = Other ->
- Other
+ QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
+ Q = case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
+ Args, Owner) of
+ #amqqueue{name = QueueName,
+ durable = Durable1,
+ auto_delete = AutoDelete1} = Q1
+ when Durable =:= Durable1, AutoDelete =:= AutoDelete1 ->
+ check_exclusive_access(Q1, Owner, strict),
+ check_configure_permitted(QueueName, State),
+ %% We need to notify the reader within the channel
+ %% process so that we can be sure there are no
+ %% outstanding exclusive queues being declared as the
+ %% connection shuts down.
+ case Owner of
+ none -> ok;
+ _ -> ok = rabbit_reader_queue_collector:register_exclusive_queue(CollectorPid, Q1)
+ end,
+ Q1;
+ %% non-equivalence trumps exclusivity arbitrarily
+ #amqqueue{name = QueueName} ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "parameters for ~s not equivalent",
+ [rabbit_misc:rs(QueueName)])
end,
return_queue_declare_ok(State, NoWait, Q);
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 73a58f13..f2a903dc 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -53,6 +53,7 @@
-define(CLOSING_TIMEOUT, 1).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
-define(SILENT_CLOSE_DELAY, 3).
+-define(FRAME_MAX, 131072). %% set to zero once QPid fix their negotiation
%---------------------------------------------------------------------------
@@ -102,6 +103,8 @@
%% heartbeat timeout -> *throw*
%% closing:
%% socket close -> *terminate*
+%% receive connection.close -> send connection.close_ok,
+%% *closing*
%% receive frame -> ignore, *closing*
%% handshake_timeout -> ignore, *closing*
%% heartbeat timeout -> *throw*
@@ -118,6 +121,8 @@
%% start terminate_connection timer, *closed*
%% closed:
%% socket close -> *terminate*
+%% receive connection.close -> send connection.close_ok,
+%% *closed*
%% receive connection.close_ok -> self() ! terminate_connection,
%% *closed*
%% receive frame -> ignore, *closed*
@@ -485,10 +490,18 @@ handle_frame(Type, Channel, Payload, State) ->
closing ->
%% According to the spec, after sending a
%% channel.close we must ignore all frames except
+ %% channel.close and channel.close_ok. In the
+ %% event of a channel.close, we should send back a
%% channel.close_ok.
case AnalyzedFrame of
{method, 'channel.close_ok', _} ->
erase({channel, Channel});
+ {method, 'channel.close', _} ->
+ %% We're already closing this channel, so
+ %% there's no cleanup to do (notify
+ %% queues, etc.)
+ ok = rabbit_writer:send_command(State#v1.sock,
+ #'channel.close_ok'{});
_ -> ok
end,
State;
@@ -605,27 +618,33 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
ok = send_on_channel0(
Sock,
#'connection.tune'{channel_max = 0,
- %% set to zero once QPid fix their negotiation
- frame_max = 131072,
+ frame_max = ?FRAME_MAX,
heartbeat = 0}),
State#v1{connection_state = tuning,
connection = Connection#connection{
user = User,
client_properties = ClientProperties}};
-handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax,
- frame_max = FrameMax,
+handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
connection = Connection,
sock = Sock}) ->
- %% if we have a channel_max limit that the client wishes to
- %% exceed, die as per spec. Not currently a problem, so we ignore
- %% the client's channel_max parameter.
- rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat),
- State#v1{connection_state = opening,
- connection = Connection#connection{
- timeout_sec = ClientHeartbeat,
- frame_max = FrameMax}};
+ if (FrameMax /= 0) and (FrameMax < ?FRAME_MIN_SIZE) ->
+ rabbit_misc:protocol_error(
+ not_allowed, "frame_max=~w < ~w min size",
+ [FrameMax, ?FRAME_MIN_SIZE]);
+ (?FRAME_MAX /= 0) and (FrameMax > ?FRAME_MAX) ->
+ rabbit_misc:protocol_error(
+ not_allowed, "frame_max=~w > ~w max size",
+ [FrameMax, ?FRAME_MAX]);
+ true ->
+ rabbit_heartbeat:start_heartbeat(Sock, ClientHeartbeat),
+ State#v1{connection_state = opening,
+ connection = Connection#connection{
+ timeout_sec = ClientHeartbeat,
+ frame_max = FrameMax}}
+ end;
+
handle_method0(#'connection.open'{virtual_host = VHostPath,
insist = Insist},
State = #v1{connection_state = opening,
@@ -659,6 +678,12 @@ handle_method0(#'connection.close'{},
State = #v1{connection_state = running}) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
+handle_method0(#'connection.close'{}, State = #v1{connection_state = CS})
+ when CS =:= closing; CS =:= closed ->
+ %% We're already closed or closing, so we don't need to cleanup
+ %% anything.
+ ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
+ State;
handle_method0(#'connection.close_ok'{},
State = #v1{connection_state = closed}) ->
self() ! terminate_connection,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ecc2613d..cf782497 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -55,6 +55,7 @@ all_tests() ->
passed = test_pg_local(),
passed = test_unfold(),
passed = test_parsing(),
+ passed = test_content_framing(),
passed = test_topic_matching(),
passed = test_log_management(),
passed = test_app_management(),
@@ -353,6 +354,45 @@ test_field_values() ->
>>),
passed.
+%% Test that content frames don't exceed frame-max
+test_content_framing(FrameMax, Fragments) ->
+ [Header | Frames] =
+ rabbit_binary_generator:build_simple_content_frames(
+ 1,
+ #content{class_id = 0, properties_bin = <<>>,
+ payload_fragments_rev = Fragments},
+ FrameMax),
+ %% header is formatted correctly and the size is the total of the
+ %% fragments
+ <<_FrameHeader:7/binary, _ClassAndWeight:4/binary,
+ BodySize:64/unsigned, _Rest/binary>> = list_to_binary(Header),
+ BodySize = size(list_to_binary(Fragments)),
+ false = lists:any(
+ fun (ContentFrame) ->
+ FrameBinary = list_to_binary(ContentFrame),
+ %% assert
+ <<_TypeAndChannel:3/binary,
+ Size:32/unsigned,
+ _Payload:Size/binary,
+ 16#CE>> = FrameBinary,
+ size(FrameBinary) > FrameMax
+ end,
+ Frames),
+ passed.
+
+test_content_framing() ->
+ %% no content
+ passed = test_content_framing(4096, []),
+ passed = test_content_framing(4096, [<<>>]),
+ %% easily fit in one frame
+ passed = test_content_framing(4096, [<<"Easy">>]),
+ %% exactly one frame (empty frame = 8 bytes)
+ passed = test_content_framing(11, [<<"One">>]),
+ %% more than one frame
+ passed = test_content_framing(20, [<<"into more than one frame">>,
+ <<"This will have to go">>]),
+ passed.
+
test_topic_match(P, R) ->
test_topic_match(P, R, true).